Repository: incubator-systemml Updated Branches: refs/heads/master b62a67c0e -> f02f7c018
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f02f7c01/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala new file mode 100644 index 0000000..fd05f27 --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.api.ml + +import org.apache.spark.rdd.RDD +import java.io.File +import org.apache.spark.SparkContext +import org.apache.spark.ml.{ Model, Estimator } +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.StructType +import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam } +import org.apache.sysml.runtime.matrix.MatrixCharacteristics +import org.apache.sysml.runtime.matrix.data.MatrixBlock +import org.apache.sysml.runtime.DMLRuntimeException +import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt => RDDConverterUtils } +import org.apache.sysml.api.mlcontext._ +import org.apache.sysml.api.mlcontext.ScriptFactory._ + +object NaiveBayes { + final val scriptPath = "scripts" + File.separator + "algorithms" + File.separator + "naive-bayes.dml" +} + +class NaiveBayes(override val uid: String, val sc: SparkContext) extends Estimator[NaiveBayesModel] with HasLaplace with BaseSystemMLClassifier { + override def copy(extra: ParamMap): Estimator[NaiveBayesModel] = { + val that = new NaiveBayes(uid, sc) + copyValues(that, extra) + } + def setLaplace(value: Double) = set(laplace, value) + + // Note: will update the y_mb as this will be called by Python mllearn + def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): NaiveBayesModel = { + val ret = fit(X_mb, y_mb, sc) + new NaiveBayesModel("naive")(ret._1, ret._2, sc) + } + + def fit(df: ScriptsUtils.SparkDataType): NaiveBayesModel = { + val ret = fit(df, sc) + new NaiveBayesModel("naive")(ret._1, ret._2, sc) + } + + def getTrainingScript(isSingleNode:Boolean):(Script, String, String) = { + val script = dml(ScriptsUtils.getDMLScript(NaiveBayes.scriptPath)) + .in("$X", " ") + .in("$Y", " ") + .in("$prior", " ") + .in("$conditionals", " ") + .in("$accuracy", " ") + .in("$laplace", toDouble(getLaplace)) + .out("classPrior", "classConditionals") + (script, "D", "C") + } +} + + +object NaiveBayesModel { + final val scriptPath = "scripts" + File.separator + "algorithms" + File.separator + "naive-bayes-predict.dml" +} + +class NaiveBayesModel(override val uid: String) + (val mloutput: MLResults, val labelMapping: java.util.HashMap[Int, String], val sc: SparkContext) + extends Model[NaiveBayesModel] with HasLaplace with BaseSystemMLClassifierModel { + + override def copy(extra: ParamMap): NaiveBayesModel = { + val that = new NaiveBayesModel(uid)(mloutput, labelMapping, sc) + copyValues(that, extra) + } + + def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String) = { + val script = dml(ScriptsUtils.getDMLScript(NaiveBayesModel.scriptPath)) + .in("$X", " ") + .in("$prior", " ") + .in("$conditionals", " ") + .in("$probabilities", " ") + .out("probs") + + val classPrior = mloutput.getBinaryBlockMatrix("classPrior") + val classConditionals = mloutput.getBinaryBlockMatrix("classConditionals") + val ret = if(isSingleNode) { + script.in("prior", classPrior.getMatrixBlock, classPrior.getMatrixMetadata) + .in("conditionals", classConditionals.getMatrixBlock, classConditionals.getMatrixMetadata) + } + else { + script.in("prior", classPrior.getBinaryBlocks, classPrior.getMatrixMetadata) + .in("conditionals", classConditionals.getBinaryBlocks, classConditionals.getMatrixMetadata) + } + (ret, "D") + } + + def transform(X: MatrixBlock): MatrixBlock = transform(X, mloutput, labelMapping, sc, "probs") + def transform(df: ScriptsUtils.SparkDataType): DataFrame = transform(df, mloutput, labelMapping, sc, "probs") + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f02f7c01/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala b/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala new file mode 100644 index 0000000..8e3893d --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.api.ml + +import org.apache.spark.sql.functions.udf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.SparkContext +import org.apache.sysml.runtime.matrix.data.MatrixBlock +import org.apache.sysml.runtime.DMLRuntimeException +import org.apache.sysml.runtime.matrix.MatrixCharacteristics +import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt => RDDConverterUtils } +import org.apache.sysml.api.mlcontext.MLResults +import org.apache.sysml.api.mlcontext.ScriptFactory._ +import org.apache.sysml.api.mlcontext.Script +import org.apache.sysml.api.mlcontext.BinaryBlockMatrix + +object PredictionUtils { + + def getGLMPredictionScript(B_full: BinaryBlockMatrix, isSingleNode:Boolean, dfam:java.lang.Integer=1): (Script, String) = { + val script = dml(ScriptsUtils.getDMLScript(LogisticRegressionModel.scriptPath)) + .in("$X", " ") + .in("$B", " ") + .in("$dfam", dfam) + .out("means") + val ret = if(isSingleNode) { + script.in("B_full", B_full.getMatrixBlock, B_full.getMatrixMetadata) + } + else { + script.in("B_full", B_full) + } + (ret, "X") + } + + def fillLabelMapping(df: ScriptsUtils.SparkDataType, revLabelMapping: java.util.HashMap[Int, String]): RDD[String] = { + val temp = df.select("label").distinct.rdd.map(_.apply(0).toString).collect() + val labelMapping = new java.util.HashMap[String, Int] + for(i <- 0 until temp.length) { + labelMapping.put(temp(i), i+1) + revLabelMapping.put(i+1, temp(i)) + } + df.select("label").rdd.map( x => labelMapping.get(x.apply(0).toString).toString ) + } + + def fillLabelMapping(y_mb: MatrixBlock, revLabelMapping: java.util.HashMap[Int, String]): Unit = { + val labelMapping = new java.util.HashMap[String, Int] + if(y_mb.getNumColumns != 1) { + throw new RuntimeException("Expected a column vector for y") + } + if(y_mb.isInSparseFormat()) { + throw new DMLRuntimeException("Sparse block is not implemented for fit") + } + else { + val denseBlock = y_mb.getDenseBlock() + var id:Int = 1 + for(i <- 0 until denseBlock.length) { + val v = denseBlock(i).toString() + if(!labelMapping.containsKey(v)) { + labelMapping.put(v, id) + revLabelMapping.put(id, v) + id += 1 + } + denseBlock.update(i, labelMapping.get(v)) + } + } + } + + class LabelMappingData(val labelMapping: java.util.HashMap[Int, String]) extends Serializable { + def mapLabelStr(x:Double):String = { + if(labelMapping.containsKey(x.toInt)) + labelMapping.get(x.toInt) + else + throw new RuntimeException("Incorrect label mapping") + } + def mapLabelDouble(x:Double):Double = { + if(labelMapping.containsKey(x.toInt)) + labelMapping.get(x.toInt).toDouble + else + throw new RuntimeException("Incorrect label mapping") + } + val mapLabel_udf = { + try { + val it = labelMapping.values().iterator() + while(it.hasNext()) { + it.next().toDouble + } + udf(mapLabelDouble _) + } catch { + case e: Exception => udf(mapLabelStr _) + } + } + } + def updateLabels(isSingleNode:Boolean, df:DataFrame, X: MatrixBlock, labelColName:String, labelMapping: java.util.HashMap[Int, String]): DataFrame = { + if(isSingleNode) { + if(X.isInSparseFormat()) { + throw new RuntimeException("Since predicted label is a column vector, expected it to be in dense format") + } + for(i <- 0 until X.getNumRows) { + val v:Int = X.getValue(i, 0).toInt + if(labelMapping.containsKey(v)) { + X.setValue(i, 0, labelMapping.get(v).toDouble) + } + else { + throw new RuntimeException("No mapping found for " + v + " in " + labelMapping.toString()) + } + } + return null + } + else { + val serObj = new LabelMappingData(labelMapping) + return df.withColumn(labelColName, serObj.mapLabel_udf(df(labelColName))) + .withColumnRenamed(labelColName, "prediction") + } + } + + def joinUsingID(df1:DataFrame, df2:DataFrame):DataFrame = { + val tempDF1 = df1.withColumnRenamed("ID", "ID1") + tempDF1.join(df2, tempDF1.col("ID1").equalTo(df2.col("ID"))).drop("ID1") + } + + def computePredictedClassLabelsFromProbability(mlscoreoutput:MLResults, isSingleNode:Boolean, sc:SparkContext, inProbVar:String): MLResults = { + val ml = new org.apache.sysml.api.mlcontext.MLContext(sc) + val script = dml( + """ + Prob = read("temp1"); + Prediction = rowIndexMax(Prob); # assuming one-based label mapping + write(Prediction, "tempOut", "csv"); + """).out("Prediction") + val probVar = mlscoreoutput.getBinaryBlockMatrix(inProbVar) + if(isSingleNode) { + ml.execute(script.in("Prob", probVar.getMatrixBlock, probVar.getMatrixMetadata)) + } + else { + ml.execute(script.in("Prob", probVar)) + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f02f7c01/src/main/scala/org/apache/sysml/api/ml/SVM.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/SVM.scala b/src/main/scala/org/apache/sysml/api/ml/SVM.scala new file mode 100644 index 0000000..07a7283 --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/ml/SVM.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.api.ml + +import org.apache.spark.rdd.RDD +import java.io.File +import org.apache.spark.SparkContext +import org.apache.spark.ml.{ Model, Estimator } +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.StructType +import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam } +import org.apache.sysml.runtime.matrix.MatrixCharacteristics +import org.apache.sysml.runtime.matrix.data.MatrixBlock +import org.apache.sysml.runtime.DMLRuntimeException +import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt => RDDConverterUtils } +import org.apache.sysml.api.mlcontext._ +import org.apache.sysml.api.mlcontext.ScriptFactory._ + +object SVM { + final val scriptPathBinary = "scripts" + File.separator + "algorithms" + File.separator + "l2-svm.dml" + final val scriptPathMulticlass = "scripts" + File.separator + "algorithms" + File.separator + "m-svm.dml" +} + +class SVM (override val uid: String, val sc: SparkContext, val isMultiClass:Boolean=false) extends Estimator[SVMModel] with HasIcpt + with HasRegParam with HasTol with HasMaxOuterIter with BaseSystemMLClassifier { + + def setIcpt(value: Int) = set(icpt, value) + def setMaxIter(value: Int) = set(maxOuterIter, value) + def setRegParam(value: Double) = set(regParam, value) + def setTol(value: Double) = set(tol, value) + + override def copy(extra: ParamMap): Estimator[SVMModel] = { + val that = new SVM(uid, sc, isMultiClass) + copyValues(that, extra) + } + + def getTrainingScript(isSingleNode:Boolean):(Script, String, String) = { + val script = dml(ScriptsUtils.getDMLScript(if(isMultiClass) SVM.scriptPathMulticlass else SVM.scriptPathBinary)) + .in("$X", " ") + .in("$Y", " ") + .in("$model", " ") + .in("$Log", " ") + .in("$icpt", toDouble(getIcpt)) + .in("$reg", toDouble(getRegParam)) + .in("$tol", toDouble(getTol)) + .in("$maxiter", toDouble(getMaxOuterIte)) + .out("w") + (script, "X", "Y") + } + + // Note: will update the y_mb as this will be called by Python mllearn + def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): SVMModel = { + val ret = fit(X_mb, y_mb, sc) + new SVMModel("svm")(ret._1, sc, isMultiClass, ret._2) + } + + def fit(df: ScriptsUtils.SparkDataType): SVMModel = { + val ret = fit(df, sc) + new SVMModel("svm")(ret._1, sc, isMultiClass, ret._2) + } + +} + +object SVMModel { + final val predictionScriptPathBinary = "scripts" + File.separator + "algorithms" + File.separator + "l2-svm-predict.dml" + final val predictionScriptPathMulticlass = "scripts" + File.separator + "algorithms" + File.separator + "m-svm-predict.dml" +} + +class SVMModel (override val uid: String)(val mloutput: MLResults, val sc: SparkContext, val isMultiClass:Boolean, + val labelMapping: java.util.HashMap[Int, String]) extends Model[SVMModel] with BaseSystemMLClassifierModel { + override def copy(extra: ParamMap): SVMModel = { + val that = new SVMModel(uid)(mloutput, sc, isMultiClass, labelMapping) + copyValues(that, extra) + } + + def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String) = { + val script = dml(ScriptsUtils.getDMLScript(if(isMultiClass) SVMModel.predictionScriptPathMulticlass else SVMModel.predictionScriptPathBinary)) + .in("$X", " ") + .in("$model", " ") + .out("scores") + + val w = mloutput.getBinaryBlockMatrix("w") + val wVar = if(isMultiClass) "W" else "w" + + val ret = if(isSingleNode) { + script.in(wVar, w.getMatrixBlock, w.getMatrixMetadata) + } + else { + script.in(wVar, w) + } + (ret, "X") + } + + def transform(X: MatrixBlock): MatrixBlock = transform(X, mloutput, labelMapping, sc, "scores") + def transform(df: ScriptsUtils.SparkDataType): DataFrame = transform(df, mloutput, labelMapping, sc, "scores") +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f02f7c01/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala b/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala index fdf682d..10f9d33 100644 --- a/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala +++ b/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala @@ -26,6 +26,8 @@ import org.apache.sysml.runtime.DMLRuntimeException object ScriptsUtils { var systemmlHome = System.getenv("SYSTEMML_HOME") + + type SparkDataType = org.apache.spark.sql.DataFrame // org.apache.spark.sql.Dataset[_] /** * set SystemML home
