http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/bd143146/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala b/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala new file mode 100644 index 0000000..2982d9c --- /dev/null +++ b/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala @@ -0,0 +1,636 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.RelationalGroupedDataset +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.Aggregate +import org.apache.spark.sql.catalyst.plans.logical.Pivot +import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper +import org.apache.spark.sql.types._ + +/** + * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. + * + * @groupname classifier + * @groupname ensemble + * @groupname evaluation + * @groupname topicmodel + * @groupname ftvec.selection + * @groupname ftvec.text + * @groupname ftvec.trans + * @groupname tools.array + * @groupname tools.bits + * @groupname tools.list + * @groupname tools.map + * @groupname tools.matrix + * @groupname tools.math + * + * A list of unsupported functions is as follows: + * * ftvec.conv + * - conv2dense + * - build_bins + */ +final class HivemallGroupedDataset(groupBy: RelationalGroupedDataset) { + + /** + * @see hivemall.classifier.KPAPredictUDAF + * @group classifier + */ + def kpa_predict(xh: String, xk: String, w0: String, w1: String, w2: String, w3: String) + : DataFrame = { + checkType(xh, DoubleType) + checkType(xk, DoubleType) + checkType(w0, FloatType) + checkType(w1, FloatType) + checkType(w2, FloatType) + checkType(w3, FloatType) + val udaf = HiveUDAFFunction( + "kpa_predict", + new HiveFunctionWrapper("hivemall.classifier.KPAPredictUDAF"), + Seq(xh, xk, w0, w1, w2, w3).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.ensemble.bagging.VotedAvgUDAF + * @group ensemble + */ + def voted_avg(weight: String): DataFrame = { + checkType(weight, DoubleType) + val udaf = HiveUDAFFunction( + "voted_avg", + new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"), + Seq(weight).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.ensemble.bagging.WeightVotedAvgUDAF + * @group ensemble + */ + def weight_voted_avg(weight: String): DataFrame = { + checkType(weight, DoubleType) + val udaf = HiveUDAFFunction( + "weight_voted_avg", + new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"), + Seq(weight).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.ensemble.ArgminKLDistanceUDAF + * @group ensemble + */ + def argmin_kld(weight: String, conv: String): DataFrame = { + checkType(weight, FloatType) + checkType(conv, FloatType) + val udaf = HiveUDAFFunction( + "argmin_kld", + new HiveFunctionWrapper("hivemall.ensemble.ArgminKLDistanceUDAF"), + Seq(weight, conv).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.ensemble.MaxValueLabelUDAF" + * @group ensemble + */ + def max_label(score: String, label: String): DataFrame = { + // checkType(score, DoubleType) + checkType(label, StringType) + val udaf = HiveUDAFFunction( + "max_label", + new HiveFunctionWrapper("hivemall.ensemble.MaxValueLabelUDAF"), + Seq(score, label).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.ensemble.MaxRowUDAF + * @group ensemble + */ + def maxrow(score: String, label: String): DataFrame = { + checkType(score, DoubleType) + checkType(label, StringType) + val udaf = HiveUDAFFunction( + "maxrow", + new HiveFunctionWrapper("hivemall.ensemble.MaxRowUDAF"), + Seq(score, label).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.smile.tools.RandomForestEnsembleUDAF + * @group ensemble + */ + @scala.annotation.varargs + def rf_ensemble(yhat: String, others: String*): DataFrame = { + checkType(yhat, IntegerType) + val udaf = HiveUDAFFunction( + "rf_ensemble", + new HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"), + (yhat +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.MeanAbsoluteErrorUDAF + * @group evaluation + */ + def mae(predict: String, target: String): DataFrame = { + checkType(predict, DoubleType) + checkType(target, DoubleType) + val udaf = HiveUDAFFunction( + "mae", + new HiveFunctionWrapper("hivemall.evaluation.MeanAbsoluteErrorUDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.MeanSquareErrorUDAF + * @group evaluation + */ + def mse(predict: String, target: String): DataFrame = { + checkType(predict, DoubleType) + checkType(target, DoubleType) + val udaf = HiveUDAFFunction( + "mse", + new HiveFunctionWrapper("hivemall.evaluation.MeanSquaredErrorUDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.RootMeanSquareErrorUDAF + * @group evaluation + */ + def rmse(predict: String, target: String): DataFrame = { + checkType(predict, DoubleType) + checkType(target, DoubleType) + val udaf = HiveUDAFFunction( + "rmse", + new HiveFunctionWrapper("hivemall.evaluation.RootMeanSquaredErrorUDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.R2UDAF + * @group evaluation + */ + def r2(predict: String, target: String): DataFrame = { + checkType(predict, DoubleType) + checkType(target, DoubleType) + val udaf = HiveUDAFFunction( + "r2", + new HiveFunctionWrapper("hivemall.evaluation.R2UDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.LogarithmicLossUDAF + * @group evaluation + */ + def logloss(predict: String, target: String): DataFrame = { + checkType(predict, DoubleType) + checkType(target, DoubleType) + val udaf = HiveUDAFFunction( + "logloss", + new HiveFunctionWrapper("hivemall.evaluation.LogarithmicLossUDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.F1ScoreUDAF + * @group evaluation + */ + def f1score(predict: String, target: String): DataFrame = { + // checkType(target, ArrayType(IntegerType, false)) + // checkType(predict, ArrayType(IntegerType, false)) + val udaf = HiveUDAFFunction( + "f1score", + new HiveFunctionWrapper("hivemall.evaluation.F1ScoreUDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.NDCGUDAF + * @group evaluation + */ + @scala.annotation.varargs + def ndcg(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "ndcg", + new HiveFunctionWrapper("hivemall.evaluation.NDCGUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.PrecisionUDAF + * @group evaluation + */ + @scala.annotation.varargs + def precision_at(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "precision_at", + new HiveFunctionWrapper("hivemall.evaluation.PrecisionUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.RecallUDAF + * @group evaluation + */ + @scala.annotation.varargs + def recall_at(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "recall_at", + new HiveFunctionWrapper("hivemall.evaluation.RecallUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.HitRateUDAF + * @group evaluation + */ + @scala.annotation.varargs + def hitrate(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "hitrate", + new HiveFunctionWrapper("hivemall.evaluation.HitRateUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.MRRUDAF + * @group evaluation + */ + @scala.annotation.varargs + def mrr(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "mrr", + new HiveFunctionWrapper("hivemall.evaluation.MRRUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.MAPUDAF + * @group evaluation + */ + @scala.annotation.varargs + def average_precision(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "average_precision", + new HiveFunctionWrapper("hivemall.evaluation.MAPUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.AUCUDAF + * @group evaluation + */ + @scala.annotation.varargs + def auc(args: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "auc", + new HiveFunctionWrapper("hivemall.evaluation.AUCUDAF"), + args.map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.topicmodel.LDAPredictUDAF + * @group topicmodel + */ + @scala.annotation.varargs + def lda_predict(word: String, value: String, label: String, lambda: String, others: String*) + : DataFrame = { + checkType(word, StringType) + checkType(value, DoubleType) + checkType(label, IntegerType) + checkType(lambda, DoubleType) + val udaf = HiveUDAFFunction( + "lda_predict", + new HiveFunctionWrapper("hivemall.topicmodel.LDAPredictUDAF"), + (word +: value +: label +: lambda +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.topicmodel.PLSAPredictUDAF + * @group topicmodel + */ + @scala.annotation.varargs + def plsa_predict(word: String, value: String, label: String, prob: String, others: String*) + : DataFrame = { + checkType(word, StringType) + checkType(value, DoubleType) + checkType(label, IntegerType) + checkType(prob, DoubleType) + val udaf = HiveUDAFFunction( + "plsa_predict", + new HiveFunctionWrapper("hivemall.topicmodel.PLSAPredictUDAF"), + (word +: value +: label +: prob +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.ftvec.text.TermFrequencyUDAF + * @group ftvec.text + */ + def tf(text: String): DataFrame = { + checkType(text, StringType) + val udaf = HiveUDAFFunction( + "tf", + new HiveFunctionWrapper("hivemall.ftvec.text.TermFrequencyUDAF"), + Seq(text).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.ftvec.trans.OnehotEncodingUDAF + * @group ftvec.trans + */ + @scala.annotation.varargs + def onehot_encoding(feature: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "onehot_encoding", + new HiveFunctionWrapper("hivemall.ftvec.trans.OnehotEncodingUDAF"), + (feature +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF + * @group ftvec.selection + */ + def snr(feature: String, label: String): DataFrame = { + val udaf = HiveUDAFFunction( + "snr", + new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"), + Seq(feature, label).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.array.ArrayAvgGenericUDAF + * @group tools.array + */ + def array_avg(ar: String): DataFrame = { + val udaf = HiveUDAFFunction( + "array_avg", + new HiveFunctionWrapper("hivemall.tools.array.ArrayAvgGenericUDAF"), + Seq(ar).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.array.ArraySumUDAF + * @group tools.array + */ + def array_sum(ar: String): DataFrame = { + val udaf = HiveUDAFFunction( + "array_sum", + new HiveFunctionWrapper("hivemall.tools.array.ArraySumUDAF"), + Seq(ar).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.bits.BitsCollectUDAF + * @group tools.bits + */ + def bits_collect(x: String): DataFrame = { + val udaf = HiveUDAFFunction( + "bits_collect", + new HiveFunctionWrapper("hivemall.tools.bits.BitsCollectUDAF"), + Seq(x).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.list.UDAFToOrderedList + * @group tools.list + */ + @scala.annotation.varargs + def to_ordered_list(value: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "to_ordered_list", + new HiveFunctionWrapper("hivemall.tools.list.UDAFToOrderedList"), + (value +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.map.UDAFToMap + * @group tools.map + */ + def to_map(key: String, value: String): DataFrame = { + val udaf = HiveUDAFFunction( + "to_map", + new HiveFunctionWrapper("hivemall.tools.map.UDAFToMap"), + Seq(key, value).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.map.UDAFToOrderedMap + * @group tools.map + */ + @scala.annotation.varargs + def to_ordered_map(key: String, value: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "to_ordered_map", + new HiveFunctionWrapper("hivemall.tools.map.UDAFToOrderedMap"), + (key +: value +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.matrix.TransposeAndDotUDAF + * @group tools.matrix + */ + def transpose_and_dot(matrix0_row: String, matrix1_row: String): DataFrame = { + val udaf = HiveUDAFFunction( + "transpose_and_dot", + new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"), + Seq(matrix0_row, matrix1_row).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.math.L2NormUDAF + * @group tools.math + */ + def l2_norm(xi: String): DataFrame = { + val udaf = HiveUDAFFunction( + "l2_norm", + new HiveFunctionWrapper("hivemall.tools.math.L2NormUDAF"), + Seq(xi).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * [[RelationalGroupedDataset]] has the three values as private fields, so, to inject Hivemall + * aggregate functions, we fetch them via Java Reflections. + */ + private val df = getPrivateField[DataFrame]("org$apache$spark$sql$RelationalGroupedDataset$$df") + private val groupingExprs = getPrivateField[Seq[Expression]]("groupingExprs") + private val groupType = getPrivateField[RelationalGroupedDataset.GroupType]("groupType") + + private def getPrivateField[T](name: String): T = { + val field = groupBy.getClass.getDeclaredField(name) + field.setAccessible(true) + field.get(groupBy).asInstanceOf[T] + } + + private def toDF(aggExprs: Seq[Expression]): DataFrame = { + val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) { + groupingExprs ++ aggExprs + } else { + aggExprs + } + + val aliasedAgg = aggregates.map(alias) + + groupType match { + case RelationalGroupedDataset.GroupByType => + Dataset.ofRows( + df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan)) + case RelationalGroupedDataset.RollupType => + Dataset.ofRows( + df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan)) + case RelationalGroupedDataset.CubeType => + Dataset.ofRows( + df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan)) + case RelationalGroupedDataset.PivotType(pivotCol, values) => + val aliasedGrps = groupingExprs.map(alias) + Dataset.ofRows( + df.sparkSession, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan)) + } + } + + private def alias(expr: Expression): NamedExpression = expr match { + case u: UnresolvedAttribute => UnresolvedAlias(u) + case expr: NamedExpression => expr + case expr: Expression => Alias(expr, expr.prettyName)() + } + + private def checkType(colName: String, expected: DataType) = { + val dataType = df.resolve(colName).dataType + if (dataType != expected) { + throw new AnalysisException( + s""""$colName" must be $expected, however it is $dataType""") + } + } +} + +object HivemallGroupedDataset { + + /** + * Implicitly inject the [[HivemallGroupedDataset]] into [[RelationalGroupedDataset]]. + */ + implicit def relationalGroupedDatasetToHivemallOne( + groupBy: RelationalGroupedDataset): HivemallGroupedDataset = { + new HivemallGroupedDataset(groupBy) + } +}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/bd143146/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala new file mode 100644 index 0000000..94bcfd6 --- /dev/null +++ b/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala @@ -0,0 +1,2247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hive + +import java.util.UUID + +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.ml.feature.HivemallFeature +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, VectorUDT} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, Generate, JoinTopK, LogicalPlan} +import org.apache.spark.sql.execution.UserProvidedPlanner +import org.apache.spark.sql.execution.datasources.csv.{CsvToStruct, StructToCsv} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + + +/** + * Hivemall wrapper and some utility functions for DataFrame. These functions below derives + * from `resources/ddl/define-all-as-permanent.hive`. + * + * @groupname regression + * @groupname classifier + * @groupname classifier.multiclass + * @groupname recommend + * @groupname topicmodel + * @groupname geospatial + * @groupname smile + * @groupname xgboost + * @groupname anomaly + * @groupname knn.similarity + * @groupname knn.distance + * @groupname knn.lsh + * @groupname ftvec + * @groupname ftvec.amplify + * @groupname ftvec.hashing + * @groupname ftvec.paring + * @groupname ftvec.scaling + * @groupname ftvec.selection + * @groupname ftvec.conv + * @groupname ftvec.trans + * @groupname ftvec.ranking + * @groupname tools + * @groupname tools.array + * @groupname tools.bits + * @groupname tools.compress + * @groupname tools.map + * @groupname tools.text + * @groupname misc + * + * A list of unsupported functions is as follows: + * * smile + * - guess_attribute_types + * * mapred functions + * - taskid + * - jobid + * - rownum + * - distcache_gets + * - jobconf_gets + * * matrix factorization + * - mf_predict + * - train_mf_sgd + * - train_mf_adagrad + * - train_bprmf + * - bprmf_predict + * * Factorization Machine + * - fm_predict + * - train_fm + * - train_ffm + * - ffm_predict + */ +final class HivemallOps(df: DataFrame) extends Logging { + import internal.HivemallOpsImpl._ + + private lazy val _sparkSession = df.sparkSession + private lazy val _strategy = new UserProvidedPlanner(_sparkSession.sqlContext.conf) + + /** + * @see [[hivemall.regression.GeneralRegressorUDTF]] + * @group regression + */ + @scala.annotation.varargs + def train_regressor(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.GeneralRegressorUDTF", + "train_regressor", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.regression.AdaDeltaUDTF]] + * @group regression + */ + @scala.annotation.varargs + def train_adadelta_regr(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.AdaDeltaUDTF", + "train_adadelta_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.regression.AdaGradUDTF]] + * @group regression + */ + @scala.annotation.varargs + def train_adagrad_regr(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.AdaGradUDTF", + "train_adagrad_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.regression.AROWRegressionUDTF]] + * @group regression + */ + @scala.annotation.varargs + def train_arow_regr(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.AROWRegressionUDTF", + "train_arow_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.regression.AROWRegressionUDTF.AROWe]] + * @group regression + */ + @scala.annotation.varargs + def train_arowe_regr(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.AROWRegressionUDTF$AROWe", + "train_arowe_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.regression.AROWRegressionUDTF.AROWe2]] + * @group regression + */ + @scala.annotation.varargs + def train_arowe2_regr(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.AROWRegressionUDTF$AROWe2", + "train_arowe2_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.regression.LogressUDTF]] + * @group regression + */ + @scala.annotation.varargs + def train_logistic_regr(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.LogressUDTF", + "train_logistic_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF]] + * @group regression + */ + @scala.annotation.varargs + def train_pa1_regr(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.PassiveAggressiveRegressionUDTF", + "train_pa1_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA1a]] + * @group regression + */ + @scala.annotation.varargs + def train_pa1a_regr(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.PassiveAggressiveRegressionUDTF$PA1a", + "train_pa1a_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2]] + * @group regression + */ + @scala.annotation.varargs + def train_pa2_regr(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2", + "train_pa2_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2a]] + * @group regression + */ + @scala.annotation.varargs + def train_pa2a_regr(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2a", + "train_pa2a_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.classifier.GeneralClassifierUDTF]] + * @group classifier + */ + @scala.annotation.varargs + def train_classifier(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.GeneralClassifierUDTF", + "train_classifier", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.classifier.PerceptronUDTF]] + * @group classifier + */ + @scala.annotation.varargs + def train_perceptron(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.PerceptronUDTF", + "train_perceptron", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.classifier.PassiveAggressiveUDTF]] + * @group classifier + */ + @scala.annotation.varargs + def train_pa(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.PassiveAggressiveUDTF", + "train_pa", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA1]] + * @group classifier + */ + @scala.annotation.varargs + def train_pa1(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.PassiveAggressiveUDTF$PA1", + "train_pa1", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA2]] + * @group classifier + */ + @scala.annotation.varargs + def train_pa2(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.PassiveAggressiveUDTF$PA2", + "train_pa2", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.classifier.ConfidenceWeightedUDTF]] + * @group classifier + */ + @scala.annotation.varargs + def train_cw(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.ConfidenceWeightedUDTF", + "train_cw", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.classifier.AROWClassifierUDTF]] + * @group classifier + */ + @scala.annotation.varargs + def train_arow(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.AROWClassifierUDTF", + "train_arow", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.classifier.AROWClassifierUDTF.AROWh]] + * @group classifier + */ + @scala.annotation.varargs + def train_arowh(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.AROWClassifierUDTF$AROWh", + "train_arowh", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]] + * @group classifier + */ + @scala.annotation.varargs + def train_scw(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.SoftConfideceWeightedUDTF$SCW1", + "train_scw", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]] + * @group classifier + */ + @scala.annotation.varargs + def train_scw2(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.SoftConfideceWeightedUDTF$SCW2", + "train_scw2", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.classifier.AdaGradRDAUDTF]] + * @group classifier + */ + @scala.annotation.varargs + def train_adagrad_rda(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.AdaGradRDAUDTF", + "train_adagrad_rda", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } + + /** + * @see [[hivemall.classifier.KernelExpansionPassiveAggressiveUDTF]] + * @group classifier + */ + @scala.annotation.varargs + def train_kpa(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.KernelExpansionPassiveAggressiveUDTF", + "train_kpa", + setMixServs(toHivemallFeatures(exprs)), + Seq("h", "hk", "w0", "w1", "w2", "w3") + ) + } + + /** + * @see [[hivemall.classifier.multiclass.MulticlassPerceptronUDTF]] + * @group classifier.multiclass + */ + @scala.annotation.varargs + def train_multiclass_perceptron(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassPerceptronUDTF", + "train_multiclass_perceptron", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight") + ) + } + + /** + * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF]] + * @group classifier.multiclass + */ + @scala.annotation.varargs + def train_multiclass_pa(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF", + "train_multiclass_pa", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight") + ) + } + + /** + * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA1]] + * @group classifier.multiclass + */ + @scala.annotation.varargs + def train_multiclass_pa1(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA1", + "train_multiclass_pa1", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight") + ) + } + + /** + * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA2]] + * @group classifier.multiclass + */ + @scala.annotation.varargs + def train_multiclass_pa2(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA2", + "train_multiclass_pa2", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight") + ) + } + + /** + * @see [[hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF]] + * @group classifier.multiclass + */ + @scala.annotation.varargs + def train_multiclass_cw(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF", + "train_multiclass_cw", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF]] + * @group classifier.multiclass + */ + @scala.annotation.varargs + def train_multiclass_arow(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF", + "train_multiclass_arow", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF.AROWh]] + * @group classifier.multiclass + */ + @scala.annotation.varargs + def train_multiclass_arowh(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF$AROWh", + "train_multiclass_arowh", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW1]] + * @group classifier.multiclass + */ + @scala.annotation.varargs + def train_multiclass_scw(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW1", + "train_multiclass_scw", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW2]] + * @group classifier.multiclass + */ + @scala.annotation.varargs + def train_multiclass_scw2(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW2", + "train_multiclass_scw2", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight", "conv") + ) + } + + /** + * @see [[hivemall.recommend.SlimUDTF]] + * @group recommend + */ + @scala.annotation.varargs + def train_slim(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.recommend.SlimUDTF", + "train_slim", + setMixServs(toHivemallFeatures(exprs)), + Seq("j", "nn", "w") + ) + } + + /** + * @see [[hivemall.topicmodel.LDAUDTF]] + * @group topicmodel + */ + @scala.annotation.varargs + def train_lda(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.topicmodel.LDAUDTF", + "train_lda", + setMixServs(toHivemallFeatures(exprs)), + Seq("topic", "word", "score") + ) + } + + /** + * @see [[hivemall.topicmodel.PLSAUDTF]] + * @group topicmodel + */ + @scala.annotation.varargs + def train_plsa(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.topicmodel.PLSAUDTF", + "train_plsa", + setMixServs(toHivemallFeatures(exprs)), + Seq("topic", "word", "score") + ) + } + + /** + * @see [[hivemall.smile.regression.RandomForestRegressionUDTF]] + * @group smile + */ + @scala.annotation.varargs + def train_randomforest_regressor(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.smile.regression.RandomForestRegressionUDTF", + "train_randomforest_regressor", + setMixServs(toHivemallFeatures(exprs)), + Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests") + ) + } + + /** + * @see [[hivemall.smile.classification.RandomForestClassifierUDTF]] + * @group smile + */ + @scala.annotation.varargs + def train_randomforest_classifier(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.smile.classification.RandomForestClassifierUDTF", + "train_randomforest_classifier", + setMixServs(toHivemallFeatures(exprs)), + Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests") + ) + } + + /** + * :: Experimental :: + * @see [[hivemall.xgboost.regression.XGBoostRegressionUDTF]] + * @group xgboost + */ + @Experimental + @scala.annotation.varargs + def train_xgboost_regr(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.xgboost.regression.XGBoostRegressionUDTF", + "train_xgboost_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("model_id", "pred_model") + ) + } + + /** + * :: Experimental :: + * @see [[hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF]] + * @group xgboost + */ + @Experimental + @scala.annotation.varargs + def train_xgboost_classifier(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF", + "train_xgboost_classifier", + setMixServs(toHivemallFeatures(exprs)), + Seq("model_id", "pred_model") + ) + } + + /** + * :: Experimental :: + * @see [[hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF]] + * @group xgboost + */ + @Experimental + @scala.annotation.varargs + def train_xgboost_multiclass_classifier(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF", + "train_xgboost_multiclass_classifier", + setMixServs(toHivemallFeatures(exprs)), + Seq("model_id", "pred_model") + ) + } + + /** + * :: Experimental :: + * @see [[hivemall.xgboost.tools.XGBoostPredictUDTF]] + * @group xgboost + */ + @Experimental + @scala.annotation.varargs + def xgboost_predict(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.xgboost.tools.XGBoostPredictUDTF", + "xgboost_predict", + setMixServs(toHivemallFeatures(exprs)), + Seq("rowid", "predicted") + ) + } + + /** + * :: Experimental :: + * @see [[hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF]] + * @group xgboost + */ + @Experimental + @scala.annotation.varargs + def xgboost_multiclass_predict(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF", + "xgboost_multiclass_predict", + setMixServs(toHivemallFeatures(exprs)), + Seq("rowid", "label", "probability") + ) + } + + /** + * @see [[hivemall.knn.similarity.DIMSUMMapperUDTF]] + * @group knn.similarity + */ + @scala.annotation.varargs + def dimsum_mapper(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.knn.similarity.DIMSUMMapperUDTF", + "dimsum_mapper", + exprs, + Seq("j", "k", "b_jk") + ) + } + + /** + * @see [[hivemall.knn.lsh.MinHashUDTF]] + * @group knn.lsh + */ + @scala.annotation.varargs + def minhash(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.knn.lsh.MinHashUDTF", + "minhash", + exprs, + Seq("clusterid", "item") + ) + } + + /** + * @see [[hivemall.ftvec.amplify.AmplifierUDTF]] + * @group ftvec.amplify + */ + @scala.annotation.varargs + def amplify(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.ftvec.amplify.AmplifierUDTF", + "amplify", + exprs, + Seq("clusterid", "item") + ) + } + + /** + * @see [[hivemall.ftvec.amplify.RandomAmplifierUDTF]] + * @group ftvec.amplify + */ + @scala.annotation.varargs + def rand_amplify(exprs: Column*): DataFrame = withTypedPlan { + throw new UnsupportedOperationException("`rand_amplify` not supported yet") + } + + /** + * Amplifies and shuffle data inside partitions. + * @group ftvec.amplify + */ + def part_amplify(xtimes: Column): DataFrame = { + val xtimesInt = xtimes.expr match { + case Literal(v: Any, IntegerType) => v.asInstanceOf[Int] + case e => throw new AnalysisException("`xtimes` must be integer, however " + e) + } + val rdd = df.rdd.mapPartitions({ iter => + val elems = iter.flatMap{ row => + Seq.fill[Row](xtimesInt)(row) + } + // Need to check how this shuffling affects results + scala.util.Random.shuffle(elems) + }, true) + df.sqlContext.createDataFrame(rdd, df.schema) + } + + /** + * Quantifies input columns. + * @see [[hivemall.ftvec.conv.QuantifyColumnsUDTF]] + * @group ftvec.conv + */ + @scala.annotation.varargs + def quantify(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.ftvec.conv.QuantifyColumnsUDTF", + "quantify", + exprs, + (0 until exprs.size - 1).map(i => s"c$i") + ) + } + + /** + * @see [[hivemall.ftvec.trans.BinarizeLabelUDTF]] + * @group ftvec.trans + */ + @scala.annotation.varargs + def binarize_label(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.ftvec.trans.BinarizeLabelUDTF", + "binarize_label", + exprs, + (0 until exprs.size - 1).map(i => s"c$i") + ) + } + + /** + * @see [[hivemall.ftvec.trans.QuantifiedFeaturesUDTF]] + * @group ftvec.trans + */ + @scala.annotation.varargs + def quantified_features(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.ftvec.trans.QuantifiedFeaturesUDTF", + "quantified_features", + exprs, + Seq("features") + ) + } + + /** + * @see [[hivemall.ftvec.ranking.BprSamplingUDTF]] + * @group ftvec.ranking + */ + @scala.annotation.varargs + def bpr_sampling(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.ftvec.ranking.BprSamplingUDTF", + "bpr_sampling", + exprs, + Seq("user", "pos_item", "neg_item") + ) + } + + /** + * @see [[hivemall.ftvec.ranking.ItemPairsSamplingUDTF]] + * @group ftvec.ranking + */ + @scala.annotation.varargs + def item_pairs_sampling(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.ftvec.ranking.ItemPairsSamplingUDTF", + "item_pairs_sampling", + exprs, + Seq("pos_item_id", "neg_item_id") + ) + } + + /** + * @see [[hivemall.ftvec.ranking.PopulateNotInUDTF]] + * @group ftvec.ranking + */ + @scala.annotation.varargs + def populate_not_in(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.ftvec.ranking.PopulateNotInUDTF", + "populate_not_in", + exprs, + Seq("item") + ) + } + + /** + * Splits Seq[String] into pieces. + * @group ftvec + */ + def explode_array(features: Column): DataFrame = { + df.explode(features) { case Row(v: Seq[_]) => + // Type erasure removes the component type in Seq + v.map(s => HivemallFeature(s.asInstanceOf[String])) + } + } + + /** + * Splits [[Vector]] into pieces. + * @group ftvec + */ + def explode_vector(features: Column): DataFrame = { + val elementSchema = StructType( + StructField("feature", StringType) :: StructField("weight", DoubleType) :: Nil) + val explodeFunc: Row => TraversableOnce[InternalRow] = (row: Row) => { + row.get(0) match { + case dv: DenseVector => + dv.values.zipWithIndex.map { + case (value, index) => + InternalRow(UTF8String.fromString(s"$index"), value) + } + case sv: SparseVector => + sv.values.zip(sv.indices).map { + case (value, index) => + InternalRow(UTF8String.fromString(s"$index"), value) + } + } + } + withTypedPlan { + Generate( + UserDefinedGenerator(elementSchema, explodeFunc, features.expr :: Nil), + unrequiredChildIndex = Seq.empty, + outer = false, None, + generatorOutput = Nil, + df.logicalPlan) + } + } + + /** + * @see [[hivemall.tools.GenerateSeriesUDTF]] + * @group tools + */ + def generate_series(start: Column, end: Column): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.tools.GenerateSeriesUDTF", + "generate_series", + start :: end :: Nil, + Seq("generate_series") + ) + } + + /** + * Returns `top-k` records for each `group`. + * @group misc + */ + def each_top_k(k: Column, score: Column, group: Column*): DataFrame = withTypedPlan { + val kInt = k.expr match { + case Literal(v: Any, IntegerType) => v.asInstanceOf[Int] + case e => throw new AnalysisException("`k` must be integer, however " + e) + } + if (kInt == 0) { + throw new AnalysisException("`k` must not have 0") + } + val clusterDf = df.repartition(group: _*).sortWithinPartitions(group: _*) + .select(score, Column("*")) + val analyzedPlan = clusterDf.queryExecution.analyzed + val inputAttrs = analyzedPlan.output + val scoreExpr = BindReferences.bindReference(analyzedPlan.expressions.head, inputAttrs) + val groupNames = group.map { _.expr match { + case ne: NamedExpression => ne.name + case ua: UnresolvedAttribute => ua.name + }} + val groupExprs = analyzedPlan.expressions.filter { + case ne: NamedExpression => groupNames.contains(ne.name) + }.map { e => + BindReferences.bindReference(e, inputAttrs) + } + val rankField = StructField("rank", IntegerType) + Generate( + generator = EachTopK( + k = kInt, + scoreExpr = scoreExpr, + groupExprs = groupExprs, + elementSchema = StructType(rankField :: Nil), + children = inputAttrs + ), + unrequiredChildIndex = Nil, + outer = false, + qualifier = None, + generatorOutput = Nil, + child = AnalysisBarrier(analyzedPlan) + ) + } + + /** + * :: Experimental :: + * Joins input two tables with the given keys and the top-k highest `score` values. + * @group misc + */ + @Experimental + def top_k_join(k: Column, right: DataFrame, joinExprs: Column, score: Column) + : DataFrame = withTypedPlanInCustomStrategy { + val kInt = k.expr match { + case Literal(v: Any, IntegerType) => v.asInstanceOf[Int] + case e => throw new AnalysisException("`k` must be integer, however " + e) + } + if (kInt == 0) { + throw new AnalysisException("`k` must not have 0") + } + JoinTopK(kInt, df.logicalPlan, right.logicalPlan, Inner, Option(joinExprs.expr))(score.named) + } + + private def doFlatten(schema: StructType, separator: Char, prefixParts: Seq[String] = Seq.empty) + : Seq[Column] = { + schema.fields.flatMap { f => + val colNameParts = prefixParts :+ f.name + f.dataType match { + case st: StructType => + doFlatten(st, separator, colNameParts) + case _ => + col(colNameParts.mkString(".")).as(colNameParts.mkString(separator.toString)) :: Nil + } + } + } + + // Converts string representation of a character to actual character + @throws[IllegalArgumentException] + private def toChar(str: String): Char = { + if (str.length == 1) { + str.charAt(0) match { + case '$' | '_' | '.' => str.charAt(0) + case _ => throw new IllegalArgumentException( + "Must use '$', '_', or '.' for separator, but got " + str) + } + } else { + throw new IllegalArgumentException( + s"Separator cannot be more than one character: $str") + } + } + + /** + * Flattens a nested schema into a flat one. + * @group misc + * + * For example: + * {{{ + * scala> val df = Seq((0, (1, (3.0, "a")), (5, 0.9))).toDF() + * scala> df.printSchema + * root + * |-- _1: integer (nullable = false) + * |-- _2: struct (nullable = true) + * | |-- _1: integer (nullable = false) + * | |-- _2: struct (nullable = true) + * | | |-- _1: double (nullable = false) + * | | |-- _2: string (nullable = true) + * |-- _3: struct (nullable = true) + * | |-- _1: integer (nullable = false) + * | |-- _2: double (nullable = false) + * + * scala> df.flatten(separator = "$").printSchema + * root + * |-- _1: integer (nullable = false) + * |-- _2$_1: integer (nullable = true) + * |-- _2$_2$_1: double (nullable = true) + * |-- _2$_2$_2: string (nullable = true) + * |-- _3$_1: integer (nullable = true) + * |-- _3$_2: double (nullable = true) + * }}} + */ + def flatten(separator: String = "$"): DataFrame = + df.select(doFlatten(df.schema, toChar(separator)): _*) + + /** + * @see [[hivemall.dataset.LogisticRegressionDataGeneratorUDTF]] + * @group misc + */ + @scala.annotation.varargs + def lr_datagen(exprs: Column*): Dataset[Row] = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper", + "lr_datagen", + exprs, + Seq("label", "features") + ) + } + + /** + * Returns all the columns as Seq[Column] in this [[DataFrame]]. + */ + private[sql] def cols: Seq[Column] = { + df.schema.fields.map(col => df.col(col.name)).toSeq + } + + /** + * :: Experimental :: + * If a parameter '-mix' does not exist in a 3rd argument, + * set it from an environmental variable + * 'HIVEMALL_MIX_SERVERS'. + * + * TODO: This could work if '--deploy-mode' has 'client'; + * otherwise, we need to set HIVEMALL_MIX_SERVERS + * in all possible spark workers. + */ + @Experimental + private def setMixServs(exprs: Seq[Column]): Seq[Column] = { + val mixes = System.getenv("HIVEMALL_MIX_SERVERS") + if (mixes != null && !mixes.isEmpty()) { + val groupId = df.sqlContext.sparkContext.applicationId + "-" + UUID.randomUUID + logInfo(s"set '${mixes}' as default mix servers (session: ${groupId})") + exprs.size match { + case 2 => exprs :+ Column( + Literal.create(s"-mix ${mixes} -mix_session ${groupId}", StringType)) + /** TODO: Add codes in the case where exprs.size == 3. */ + case _ => exprs + } + } else { + exprs + } + } + + /** + * If the input is a [[Vector]], transform it into Hivemall features. + */ + @inline private def toHivemallFeatures(exprs: Seq[Column]): Seq[Column] = { + df.select(exprs: _*).queryExecution.analyzed.schema.zip(exprs).map { + case (StructField(_, _: VectorUDT, _, _), c) => HivemallUtils.to_hivemall_features(c) + case (_, c) => c + } + } + + /** + * A convenient function to wrap a logical plan and produce a DataFrame. + */ + @inline private def withTypedPlan(logicalPlan: => LogicalPlan): DataFrame = { + val queryExecution = _sparkSession.sessionState.executePlan(logicalPlan) + val outputSchema = queryExecution.sparkPlan.schema + new Dataset[Row](df.sparkSession, queryExecution, RowEncoder(outputSchema)) + } + + @inline private def withTypedPlanInCustomStrategy(logicalPlan: => LogicalPlan) + : DataFrame = { + // Inject custom strategies + if (!_sparkSession.experimental.extraStrategies.contains(_strategy)) { + _sparkSession.experimental.extraStrategies = Seq(_strategy) + } + withTypedPlan(logicalPlan) + } +} + +object HivemallOps { + import internal.HivemallOpsImpl._ + + /** + * Implicitly inject the [[HivemallOps]] into [[DataFrame]]. + */ + implicit def dataFrameToHivemallOps(df: DataFrame): HivemallOps = + new HivemallOps(df) + + /** + * @see [[hivemall.HivemallVersionUDF]] + * @group misc + */ + def hivemall_version(): Column = withExpr { + planHiveUDF( + "hivemall.HivemallVersionUDF", + "hivemall_version", + Nil + ) + } + + /** + * @see [[hivemall.geospatial.TileUDF]] + * @group geospatial + */ + def tile(lat: Column, lon: Column, zoom: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.TileUDF", + "tile", + lat :: lon :: zoom :: Nil + ) + } + + /** + * @see [[hivemall.geospatial.MapURLUDF]] + * @group geospatial + */ + @scala.annotation.varargs + def map_url(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.MapURLUDF", + "map_url", + exprs + ) + } + + /** + * @see [[hivemall.geospatial.Lat2TileYUDF]] + * @group geospatial + */ + def lat2tiley(lat: Column, zoom: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.Lat2TileYUDF", + "lat2tiley", + lat :: zoom :: Nil + ) + } + + /** + * @see [[hivemall.geospatial.Lon2TileXUDF]] + * @group geospatial + */ + def lon2tilex(lon: Column, zoom: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.Lon2TileXUDF", + "lon2tilex", + lon :: zoom :: Nil + ) + } + + /** + * @see [[hivemall.geospatial.TileX2LonUDF]] + * @group geospatial + */ + def tilex2lon(x: Column, zoom: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.TileX2LonUDF", + "tilex2lon", + x :: zoom :: Nil + ) + } + + /** + * @see [[hivemall.geospatial.TileY2LatUDF]] + * @group geospatial + */ + def tiley2lat(y: Column, zoom: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.TileY2LatUDF", + "tiley2lat", + y :: zoom :: Nil + ) + } + + /** + * @see [[hivemall.geospatial.HaversineDistanceUDF]] + * @group geospatial + */ + @scala.annotation.varargs + def haversine_distance(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.HaversineDistanceUDF", + "haversine_distance", + exprs + ) + } + + /** + * @see [[hivemall.smile.tools.TreePredictUDF]] + * @group smile + */ + @scala.annotation.varargs + def tree_predict(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.smile.tools.TreePredictUDF", + "tree_predict", + exprs + ) + } + + /** + * @see [[hivemall.smile.tools.TreeExportUDF]] + * @group smile + */ + @scala.annotation.varargs + def tree_export(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.smile.tools.TreeExportUDF", + "tree_export", + exprs + ) + } + + /** + * @see [[hivemall.anomaly.ChangeFinderUDF]] + * @group anomaly + */ + @scala.annotation.varargs + def changefinder(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.anomaly.ChangeFinderUDF", + "changefinder", + exprs + ) + } + + /** + * @see [[hivemall.anomaly.SingularSpectrumTransformUDF]] + * @group anomaly + */ + @scala.annotation.varargs + def sst(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.anomaly.SingularSpectrumTransformUDF", + "sst", + exprs + ) + } + + /** + * @see [[hivemall.knn.similarity.CosineSimilarityUDF]] + * @group knn.similarity + */ + @scala.annotation.varargs + def cosine_similarity(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.knn.similarity.CosineSimilarityUDF", + "cosine_similarity", + exprs + ) + } + + /** + * @see [[hivemall.knn.similarity.JaccardIndexUDF]] + * @group knn.similarity + */ + @scala.annotation.varargs + def jaccard_similarity(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.knn.similarity.JaccardIndexUDF", + "jaccard_similarity", + exprs + ) + } + + /** + * @see [[hivemall.knn.similarity.AngularSimilarityUDF]] + * @group knn.similarity + */ + @scala.annotation.varargs + def angular_similarity(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.knn.similarity.AngularSimilarityUDF", + "angular_similarity", + exprs + ) + } + + /** + * @see [[hivemall.knn.similarity.EuclidSimilarity]] + * @group knn.similarity + */ + @scala.annotation.varargs + def euclid_similarity(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.knn.similarity.EuclidSimilarity", + "euclid_similarity", + exprs + ) + } + + /** + * @see [[hivemall.knn.similarity.Distance2SimilarityUDF]] + * @group knn.similarity + */ + @scala.annotation.varargs + def distance2similarity(exprs: Column*): Column = withExpr { + // TODO: Need a wrapper class because of using unsupported types + planHiveGenericUDF( + "hivemall.knn.similarity.Distance2SimilarityUDF", + "distance2similarity", + exprs + ) + } + + /** + * @see [[hivemall.knn.distance.HammingDistanceUDF]] + * @group knn.distance + */ + @scala.annotation.varargs + def hamming_distance(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.knn.distance.HammingDistanceUDF", + "hamming_distance", + exprs + ) + } + + /** + * @see [[hivemall.knn.distance.PopcountUDF]] + * @group knn.distance + */ + @scala.annotation.varargs + def popcnt(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.knn.distance.PopcountUDF", + "popcnt", + exprs + ) + } + + /** + * @see [[hivemall.knn.distance.KLDivergenceUDF]] + * @group knn.distance + */ + @scala.annotation.varargs + def kld(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.knn.distance.KLDivergenceUDF", + "kld", + exprs + ) + } + + /** + * @see [[hivemall.knn.distance.EuclidDistanceUDF]] + * @group knn.distance + */ + @scala.annotation.varargs + def euclid_distance(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.knn.distance.EuclidDistanceUDF", + "euclid_distance", + exprs + ) + } + + /** + * @see [[hivemall.knn.distance.CosineDistanceUDF]] + * @group knn.distance + */ + @scala.annotation.varargs + def cosine_distance(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.knn.distance.CosineDistanceUDF", + "cosine_distance", + exprs + ) + } + + /** + * @see [[hivemall.knn.distance.AngularDistanceUDF]] + * @group knn.distance + */ + @scala.annotation.varargs + def angular_distance(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.knn.distance.AngularDistanceUDF", + "angular_distance", + exprs + ) + } + + /** + * @see [[hivemall.knn.distance.JaccardDistanceUDF]] + * @group knn.distance + */ + @scala.annotation.varargs + def jaccard_distance(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.knn.distance.JaccardDistanceUDF", + "jaccard_distance", + exprs + ) + } + + /** + * @see [[hivemall.knn.distance.ManhattanDistanceUDF]] + * @group knn.distance + */ + @scala.annotation.varargs + def manhattan_distance(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.knn.distance.ManhattanDistanceUDF", + "manhattan_distance", + exprs + ) + } + + /** + * @see [[hivemall.knn.distance.MinkowskiDistanceUDF]] + * @group knn.distance + */ + @scala.annotation.varargs + def minkowski_distance(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.knn.distance.MinkowskiDistanceUDF", + "minkowski_distance", + exprs + ) + } + + /** + * @see [[hivemall.knn.lsh.bBitMinHashUDF]] + * @group knn.lsh + */ + @scala.annotation.varargs + def bbit_minhash(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.knn.lsh.bBitMinHashUDF", + "bbit_minhash", + exprs + ) + } + + /** + * @see [[hivemall.knn.lsh.MinHashesUDFWrapper]] + * @group knn.lsh + */ + @scala.annotation.varargs + def minhashes(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.knn.lsh.MinHashesUDFWrapper", + "minhashes", + exprs + ) + } + + /** + * Returns new features with `1.0` (bias) appended to the input features. + * @see [[hivemall.ftvec.AddBiasUDFWrapper]] + * @group ftvec + */ + def add_bias(expr: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.AddBiasUDFWrapper", + "add_bias", + expr :: Nil + ) + } + + /** + * @see [[hivemall.ftvec.ExtractFeatureUDFWrapper]] + * @group ftvec + * + * TODO: This throws java.lang.ClassCastException because + * HiveInspectors.toInspector has a bug in spark. + * Need to fix it later. + */ + def extract_feature(expr: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.ExtractFeatureUDFWrapper", + "extract_feature", + expr :: Nil + ) + }.as("feature") + + /** + * @see [[hivemall.ftvec.ExtractWeightUDFWrapper]] + * @group ftvec + * + * TODO: This throws java.lang.ClassCastException because + * HiveInspectors.toInspector has a bug in spark. + * Need to fix it later. + */ + def extract_weight(expr: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.ExtractWeightUDFWrapper", + "extract_weight", + expr :: Nil + ) + }.as("value") + + /** + * @see [[hivemall.ftvec.AddFeatureIndexUDFWrapper]] + * @group ftvec + */ + def add_feature_index(features: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.AddFeatureIndexUDFWrapper", + "add_feature_index", + features :: Nil + ) + } + + /** + * @see [[hivemall.ftvec.SortByFeatureUDFWrapper]] + * @group ftvec + */ + def sort_by_feature(expr: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.SortByFeatureUDFWrapper", + "sort_by_feature", + expr :: Nil + ) + } + + /** + * @see [[hivemall.ftvec.hashing.MurmurHash3UDF]] + * @group ftvec.hashing + */ + def mhash(expr: Column): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.hashing.MurmurHash3UDF", + "mhash", + expr :: Nil + ) + } + + /** + * @see [[hivemall.ftvec.hashing.Sha1UDF]] + * @group ftvec.hashing + */ + @scala.annotation.varargs + def sha1(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.hashing.Sha1UDF", + "sha1", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.hashing.ArrayHashValuesUDF]] + * @group ftvec.hashing + */ + @scala.annotation.varargs + def array_hash_values(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.hashing.ArrayHashValuesUDF", + "array_hash_values", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF]] + * @group ftvec.hashing + */ + @scala.annotation.varargs + def prefixed_hash_values(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF", + "prefixed_hash_values", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.hashing.FeatureHashingUDF]] + * @group ftvec.hashing + */ + @scala.annotation.varargs + def feature_hashing(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.hashing.FeatureHashingUDF", + "feature_hashing", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.pairing.PolynomialFeaturesUDF]] + * @group ftvec.paring + */ + @scala.annotation.varargs + def polynomial_features(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.pairing.PolynomialFeaturesUDF", + "polynomial_features", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.pairing.PoweredFeaturesUDF]] + * @group ftvec.paring + */ + @scala.annotation.varargs + def powered_features(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.pairing.PoweredFeaturesUDF", + "powered_features", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.scaling.RescaleUDF]] + * @group ftvec.scaling + */ + def rescale(value: Column, max: Column, min: Column): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.scaling.RescaleUDF", + "rescale", + value.cast(FloatType) :: max :: min :: Nil + ) + } + + /** + * @see [[hivemall.ftvec.scaling.ZScoreUDF]] + * @group ftvec.scaling + */ + @scala.annotation.varargs + def zscore(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.scaling.ZScoreUDF", + "zscore", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.scaling.L2NormalizationUDFWrapper]] + * @group ftvec.scaling + */ + def l2_normalize(expr: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.scaling.L2NormalizationUDFWrapper", + "normalize", + expr :: Nil + ) + } + + /** + * @see [[hivemall.ftvec.selection.ChiSquareUDF]] + * @group ftvec.selection + */ + def chi2(observed: Column, expected: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.selection.ChiSquareUDF", + "chi2", + Seq(observed, expected) + ) + } + + /** + * @see [[hivemall.ftvec.conv.ToDenseFeaturesUDF]] + * @group ftvec.conv + */ + @scala.annotation.varargs + def to_dense_features(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.conv.ToDenseFeaturesUDF", + "to_dense_features", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.conv.ToSparseFeaturesUDF]] + * @group ftvec.conv + */ + @scala.annotation.varargs + def to_sparse_features(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.conv.ToSparseFeaturesUDF", + "to_sparse_features", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.binning.FeatureBinningUDF]] + * @group ftvec.conv + */ + @scala.annotation.varargs + def feature_binning(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.binning.FeatureBinningUDF", + "feature_binning", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.trans.VectorizeFeaturesUDF]] + * @group ftvec.trans + */ + @scala.annotation.varargs + def vectorize_features(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.trans.VectorizeFeaturesUDF", + "vectorize_features", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.trans.CategoricalFeaturesUDF]] + * @group ftvec.trans + */ + @scala.annotation.varargs + def categorical_features(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.trans.CategoricalFeaturesUDF", + "categorical_features", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.trans.FFMFeaturesUDF]] + * @group ftvec.trans + */ + @scala.annotation.varargs + def ffm_features(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.trans.FFMFeaturesUDF", + "ffm_features", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.trans.IndexedFeatures]] + * @group ftvec.trans + */ + @scala.annotation.varargs + def indexed_features(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.trans.IndexedFeatures", + "indexed_features", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.trans.QuantitativeFeaturesUDF]] + * @group ftvec.trans + */ + @scala.annotation.varargs + def quantitative_features(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.trans.QuantitativeFeaturesUDF", + "quantitative_features", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.trans.AddFieldIndicesUDF]] + * @group ftvec.trans + */ + def add_field_indices(features: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.trans.AddFieldIndicesUDF", + "add_field_indices", + features :: Nil + ) + } + + /** + * @see [[hivemall.tools.ConvertLabelUDF]] + * @group tools + */ + def convert_label(label: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.ConvertLabelUDF", + "convert_label", + label :: Nil + ) + } + + /** + * @see [[hivemall.tools.RankSequenceUDF]] + * @group tools + */ + def x_rank(key: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.RankSequenceUDF", + "x_rank", + key :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.AllocFloatArrayUDF]] + * @group tools.array + */ + def float_array(nDims: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.AllocFloatArrayUDF", + "float_array", + nDims :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.ArrayRemoveUDF]] + * @group tools.array + */ + def array_remove(original: Column, target: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.ArrayRemoveUDF", + "array_remove", + original :: target :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.SortAndUniqArrayUDF]] + * @group tools.array + */ + def sort_and_uniq_array(ar: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.SortAndUniqArrayUDF", + "sort_and_uniq_array", + ar :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.SubarrayEndWithUDF]] + * @group tools.array + */ + def subarray_endwith(original: Column, key: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.SubarrayEndWithUDF", + "subarray_endwith", + original :: key :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.ArrayConcatUDF]] + * @group tools.array + */ + @scala.annotation.varargs + def array_concat(arrays: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.array.ArrayConcatUDF", + "array_concat", + arrays + ) + } + + /** + * @see [[hivemall.tools.array.SubarrayUDF]] + * @group tools.array + */ + def subarray(original: Column, fromIndex: Column, toIndex: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.SubarrayUDF", + "subarray", + original :: fromIndex :: toIndex :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.ToStringArrayUDF]] + * @group tools.array + */ + def to_string_array(ar: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.ToStringArrayUDF", + "to_string_array", + ar :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.ArrayIntersectUDF]] + * @group tools.array + */ + @scala.annotation.varargs + def array_intersect(arrays: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.array.ArrayIntersectUDF", + "array_intersect", + arrays + ) + } + + /** + * @see [[hivemall.tools.array.SelectKBestUDF]] + * @group tools.array + */ + def select_k_best(X: Column, importanceList: Column, k: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.array.SelectKBestUDF", + "select_k_best", + Seq(X, importanceList, k) + ) + } + + /** + * @see [[hivemall.tools.bits.ToBitsUDF]] + * @group tools.bits + */ + def to_bits(indexes: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.bits.ToBitsUDF", + "to_bits", + indexes :: Nil + ) + } + + /** + * @see [[hivemall.tools.bits.UnBitsUDF]] + * @group tools.bits + */ + def unbits(bitset: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.bits.UnBitsUDF", + "unbits", + bitset :: Nil + ) + } + + /** + * @see [[hivemall.tools.bits.BitsORUDF]] + * @group tools.bits + */ + @scala.annotation.varargs + def bits_or(bits: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.bits.BitsORUDF", + "bits_or", + bits + ) + } + + /** + * @see [[hivemall.tools.compress.InflateUDF]] + * @group tools.compress + */ + @scala.annotation.varargs + def inflate(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.compress.InflateUDF", + "inflate", + exprs + ) + } + + /** + * @see [[hivemall.tools.compress.DeflateUDF]] + * @group tools.compress + */ + @scala.annotation.varargs + def deflate(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.compress.DeflateUDF", + "deflate", + exprs + ) + } + + /** + * @see [[hivemall.tools.map.MapGetSumUDF]] + * @group tools.map + */ + @scala.annotation.varargs + def map_get_sum(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.tools.map.MapGetSumUDF", + "map_get_sum", + exprs + ) + } + + /** + * @see [[hivemall.tools.map.MapTailNUDF]] + * @group tools.map + */ + @scala.annotation.varargs + def map_tail_n(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.map.MapTailNUDF", + "map_tail_n", + exprs + ) + } + + /** + * @see [[hivemall.tools.text.TokenizeUDF]] + * @group tools.text + */ + @scala.annotation.varargs + def tokenize(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.TokenizeUDF", + "tokenize", + exprs + ) + } + + /** + * @see [[hivemall.tools.text.StopwordUDF]] + * @group tools.text + */ + def is_stopword(word: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.StopwordUDF", + "is_stopword", + word :: Nil + ) + } + + /** + * @see [[hivemall.tools.text.SingularizeUDF]] + * @group tools.text + */ + def singularize(word: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.SingularizeUDF", + "singularize", + word :: Nil + ) + } + + /** + * @see [[hivemall.tools.text.SplitWordsUDF]] + * @group tools.text + */ + @scala.annotation.varargs + def split_words(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.SplitWordsUDF", + "split_words", + exprs + ) + } + + /** + * @see [[hivemall.tools.text.NormalizeUnicodeUDF]] + * @group tools.text + */ + @scala.annotation.varargs + def normalize_unicode(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.NormalizeUnicodeUDF", + "normalize_unicode", + exprs + ) + } + + /** + * @see [[hivemall.tools.text.Base91UDF]] + * @group tools.text + */ + def base91(bin: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.text.Base91UDF", + "base91", + bin :: Nil + ) + } + + /** + * @see [[hivemall.tools.text.Unbase91UDF]] + * @group tools.text + */ + def unbase91(base91String: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.text.Unbase91UDF", + "unbase91", + base91String :: Nil + ) + } + + /** + * @see [[hivemall.tools.text.WordNgramsUDF]] + * @group tools.text + */ + def word_ngrams(words: Column, minSize: Column, maxSize: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.WordNgramsUDF", + "word_ngrams", + words :: minSize :: maxSize :: Nil + ) + } + + /** + * @see [[hivemall.tools.math.SigmoidGenericUDF]] + * @group misc + */ + def sigmoid(expr: Column): Column = { + val one: () => Literal = () => Literal.create(1.0, DoubleType) + Column(one()) / (Column(one()) + exp(-expr)) + } + + /** + * @see [[hivemall.tools.mapred.RowIdUDFWrapper]] + * @group misc + */ + def rowid(): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.mapred.RowIdUDFWrapper", + "rowid", + Nil + ) + }.as("rowid") + + /** + * Parses a column containing a CSV string into a [[StructType]] with the specified schema. + * Returns `null`, in the case of an unparseable string. + * @group misc + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the csv string + * @param options options to control how the csv is parsed. accepts the same options and the + * csv data source. + */ + def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { + CsvToStruct(schema, options, e.expr) + } + + /** + * Parses a column containing a CSV string into a [[StructType]] with the specified schema. + * Returns `null`, in the case of an unparseable string. + * @group misc + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the json string + */ + def from_csv(e: Column, schema: StructType): Column = + from_csv(e, schema, Map.empty[String, String]) + + /** + * Converts a column containing a [[StructType]] into a CSV string with the specified schema. + * Throws an exception, in the case of an unsupported type. + * @group misc + * + * @param e a struct column. + * @param options options to control how the struct column is converted into a json string. + * accepts the same options and the json data source. + */ + def to_csv(e: Column, options: Map[String, String]): Column = withExpr { + StructToCsv(options, e.expr) + } + + /** + * Converts a column containing a [[StructType]] into a CSV string with the specified schema. + * Throws an exception, in the case of an unsupported type. + * @group misc + * + * @param e a struct column. + */ + def to_csv(e: Column): Column = to_csv(e, Map.empty[String, String]) + + /** + * A convenient function to wrap an expression and produce a Column. + */ + @inline private def withExpr(expr: Expression): Column = Column(expr) +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/bd143146/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala b/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala new file mode 100644 index 0000000..70cf00b --- /dev/null +++ b/spark/spark-2.3/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, Vectors} +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +object HivemallUtils { + + // # of maximum dimensions for feature vectors + private[this] val maxDims = 100000000 + + /** + * Check whether the given schema contains a column of the required data type. + * @param colName column name + * @param dataType required column data type + */ + private[this] def checkColumnType(schema: StructType, colName: String, dataType: DataType) + : Unit = { + val actualDataType = schema(colName).dataType + require(actualDataType.equals(dataType), + s"Column $colName must be of type $dataType but was actually $actualDataType.") + } + + def to_vector_func(dense: Boolean, dims: Int): Seq[String] => Vector = { + if (dense) { + // Dense features + i: Seq[String] => { + val features = new Array[Double](dims) + i.map { ft => + val s = ft.split(":").ensuring(_.size == 2) + features(s(0).toInt) = s(1).toDouble + } + Vectors.dense(features) + } + } else { + // Sparse features + i: Seq[String] => { + val features = i.map { ft => + // val s = ft.split(":").ensuring(_.size == 2) + val s = ft.split(":") + (s(0).toInt, s(1).toDouble) + } + Vectors.sparse(dims, features) + } + } + } + + def to_hivemall_features_func(): Vector => Array[String] = { + case dv: DenseVector => + dv.values.zipWithIndex.map { + case (value, index) => s"$index:$value" + } + case sv: SparseVector => + sv.values.zip(sv.indices).map { + case (value, index) => s"$index:$value" + } + case v => + throw new IllegalArgumentException(s"Do not support vector type ${v.getClass}") + } + + def append_bias_func(): Vector => Vector = { + case dv: DenseVector => + val inputValues = dv.values + val inputLength = inputValues.length + val outputValues = Array.ofDim[Double](inputLength + 1) + System.arraycopy(inputValues, 0, outputValues, 0, inputLength) + outputValues(inputLength) = 1.0 + Vectors.dense(outputValues) + case sv: SparseVector => + val inputValues = sv.values + val inputIndices = sv.indices + val inputValuesLength = inputValues.length + val dim = sv.size + val outputValues = Array.ofDim[Double](inputValuesLength + 1) + val outputIndices = Array.ofDim[Int](inputValuesLength + 1) + System.arraycopy(inputValues, 0, outputValues, 0, inputValuesLength) + System.arraycopy(inputIndices, 0, outputIndices, 0, inputValuesLength) + outputValues(inputValuesLength) = 1.0 + outputIndices(inputValuesLength) = dim + Vectors.sparse(dim + 1, outputIndices, outputValues) + case v => + throw new IllegalArgumentException(s"Do not support vector type ${v.getClass}") + } + + /** + * Transforms Hivemall features into a [[Vector]]. + */ + def to_vector(dense: Boolean = false, dims: Int = maxDims): UserDefinedFunction = { + udf(to_vector_func(dense, dims)) + } + + /** + * Transforms a [[Vector]] into Hivemall features. + */ + def to_hivemall_features: UserDefinedFunction = udf(to_hivemall_features_func) + + /** + * Returns a new [[Vector]] with `1.0` (bias) appended to the input [[Vector]]. + * @group ftvec + */ + def append_bias: UserDefinedFunction = udf(append_bias_func) + + /** + * Builds a [[Vector]]-based model from a table of Hivemall models + */ + def vectorized_model(df: DataFrame, dense: Boolean = false, dims: Int = maxDims) + : UserDefinedFunction = { + checkColumnType(df.schema, "feature", StringType) + checkColumnType(df.schema, "weight", DoubleType) + + import df.sqlContext.implicits._ + val intercept = df + .where($"feature" === "0") + .select($"weight") + .map { case Row(weight: Double) => weight} + .reduce(_ + _) + val weights = to_vector_func(dense, dims)( + df.select($"feature", $"weight") + .where($"feature" !== "0") + .map { case Row(label: String, feature: Double) => s"${label}:$feature"} + .collect.toSeq) + + udf((input: Vector) => BLAS.dot(input, weights) + intercept) + } +}
