Repository: incubator-systemml
Updated Branches:
  refs/heads/master b62a67c0e -> f02f7c018


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f02f7c01/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala 
b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
new file mode 100644
index 0000000..fd05f27
--- /dev/null
+++ b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.api.ml
+
+import org.apache.spark.rdd.RDD
+import java.io.File
+import org.apache.spark.SparkContext
+import org.apache.spark.ml.{ Model, Estimator }
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam }
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics
+import org.apache.sysml.runtime.matrix.data.MatrixBlock
+import org.apache.sysml.runtime.DMLRuntimeException
+import org.apache.sysml.runtime.instructions.spark.utils.{ 
RDDConverterUtilsExt => RDDConverterUtils }
+import org.apache.sysml.api.mlcontext._
+import org.apache.sysml.api.mlcontext.ScriptFactory._
+
+object NaiveBayes {
+  final val scriptPath = "scripts" + File.separator + "algorithms" + 
File.separator + "naive-bayes.dml"
+}
+
+class NaiveBayes(override val uid: String, val sc: SparkContext) extends 
Estimator[NaiveBayesModel] with HasLaplace with BaseSystemMLClassifier {
+  override def copy(extra: ParamMap): Estimator[NaiveBayesModel] = {
+    val that = new NaiveBayes(uid, sc)
+    copyValues(that, extra)
+  }
+  def setLaplace(value: Double) = set(laplace, value)
+  
+  // Note: will update the y_mb as this will be called by Python mllearn
+  def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): NaiveBayesModel = {
+    val ret = fit(X_mb, y_mb, sc)
+    new NaiveBayesModel("naive")(ret._1, ret._2, sc)
+  }
+  
+  def fit(df: ScriptsUtils.SparkDataType): NaiveBayesModel = {
+    val ret = fit(df, sc)
+    new NaiveBayesModel("naive")(ret._1, ret._2, sc)
+  }
+  
+  def getTrainingScript(isSingleNode:Boolean):(Script, String, String)  = {
+    val script = dml(ScriptsUtils.getDMLScript(NaiveBayes.scriptPath))
+      .in("$X", " ")
+      .in("$Y", " ")
+      .in("$prior", " ")
+      .in("$conditionals", " ")
+      .in("$accuracy", " ")
+      .in("$laplace", toDouble(getLaplace))
+      .out("classPrior", "classConditionals")
+    (script, "D", "C")
+  }
+}
+
+
+object NaiveBayesModel {
+  final val scriptPath = "scripts" + File.separator + "algorithms" + 
File.separator + "naive-bayes-predict.dml"
+}
+
+class NaiveBayesModel(override val uid: String)
+  (val mloutput: MLResults, val labelMapping: java.util.HashMap[Int, String], 
val sc: SparkContext) 
+  extends Model[NaiveBayesModel] with HasLaplace with 
BaseSystemMLClassifierModel {
+  
+  override def copy(extra: ParamMap): NaiveBayesModel = {
+    val that = new NaiveBayesModel(uid)(mloutput, labelMapping, sc)
+    copyValues(that, extra)
+  }
+  
+  def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, 
String)  = {
+    val script = dml(ScriptsUtils.getDMLScript(NaiveBayesModel.scriptPath))
+      .in("$X", " ")
+      .in("$prior", " ")
+      .in("$conditionals", " ")
+      .in("$probabilities", " ")
+      .out("probs")
+    
+    val classPrior = mloutput.getBinaryBlockMatrix("classPrior")
+    val classConditionals = mloutput.getBinaryBlockMatrix("classConditionals")
+    val ret = if(isSingleNode) {
+      script.in("prior", classPrior.getMatrixBlock, 
classPrior.getMatrixMetadata)
+            .in("conditionals", classConditionals.getMatrixBlock, 
classConditionals.getMatrixMetadata)
+    }
+    else {
+      script.in("prior", classPrior.getBinaryBlocks, 
classPrior.getMatrixMetadata)
+            .in("conditionals", classConditionals.getBinaryBlocks, 
classConditionals.getMatrixMetadata)
+    }
+    (ret, "D")
+  }
+  
+  def transform(X: MatrixBlock): MatrixBlock = transform(X, mloutput, 
labelMapping, sc, "probs")
+  def transform(df: ScriptsUtils.SparkDataType): DataFrame = transform(df, 
mloutput, labelMapping, sc, "probs")
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f02f7c01/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala 
b/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala
new file mode 100644
index 0000000..8e3893d
--- /dev/null
+++ b/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.api.ml
+
+import org.apache.spark.sql.functions.udf
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.SparkContext
+import org.apache.sysml.runtime.matrix.data.MatrixBlock
+import org.apache.sysml.runtime.DMLRuntimeException
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics
+import org.apache.sysml.runtime.instructions.spark.utils.{ 
RDDConverterUtilsExt => RDDConverterUtils }
+import org.apache.sysml.api.mlcontext.MLResults
+import org.apache.sysml.api.mlcontext.ScriptFactory._
+import org.apache.sysml.api.mlcontext.Script
+import org.apache.sysml.api.mlcontext.BinaryBlockMatrix
+
+object PredictionUtils {
+  
+  def getGLMPredictionScript(B_full: BinaryBlockMatrix, isSingleNode:Boolean, 
dfam:java.lang.Integer=1): (Script, String)  = {
+    val script = 
dml(ScriptsUtils.getDMLScript(LogisticRegressionModel.scriptPath))
+      .in("$X", " ")
+      .in("$B", " ")
+      .in("$dfam", dfam)
+      .out("means")
+    val ret = if(isSingleNode) {
+      script.in("B_full", B_full.getMatrixBlock, B_full.getMatrixMetadata)
+    }
+    else {
+      script.in("B_full", B_full)
+    }
+    (ret, "X")
+  }
+  
+  def fillLabelMapping(df: ScriptsUtils.SparkDataType, revLabelMapping: 
java.util.HashMap[Int, String]): RDD[String]  = {
+    val temp = 
df.select("label").distinct.rdd.map(_.apply(0).toString).collect()
+    val labelMapping = new java.util.HashMap[String, Int]
+    for(i <- 0 until temp.length) {
+      labelMapping.put(temp(i), i+1)
+      revLabelMapping.put(i+1, temp(i))
+    }
+    df.select("label").rdd.map( x => 
labelMapping.get(x.apply(0).toString).toString )
+  }
+  
+  def fillLabelMapping(y_mb: MatrixBlock, revLabelMapping: 
java.util.HashMap[Int, String]): Unit = {
+    val labelMapping = new java.util.HashMap[String, Int]
+    if(y_mb.getNumColumns != 1) {
+      throw new RuntimeException("Expected a column vector for y")
+    }
+    if(y_mb.isInSparseFormat()) {
+      throw new DMLRuntimeException("Sparse block is not implemented for fit")
+    }
+    else {
+      val denseBlock = y_mb.getDenseBlock()
+      var id:Int = 1
+      for(i <- 0 until denseBlock.length) {
+        val v = denseBlock(i).toString()
+        if(!labelMapping.containsKey(v)) {
+          labelMapping.put(v, id)
+          revLabelMapping.put(id, v)
+          id += 1
+        }
+        denseBlock.update(i, labelMapping.get(v))
+      }  
+    }
+  }
+  
+  class LabelMappingData(val labelMapping: java.util.HashMap[Int, String]) 
extends Serializable {
+   def mapLabelStr(x:Double):String = {
+     if(labelMapping.containsKey(x.toInt))
+       labelMapping.get(x.toInt)
+     else
+       throw new RuntimeException("Incorrect label mapping")
+   }
+   def mapLabelDouble(x:Double):Double = {
+     if(labelMapping.containsKey(x.toInt))
+       labelMapping.get(x.toInt).toDouble
+     else
+       throw new RuntimeException("Incorrect label mapping")
+   }
+   val mapLabel_udf =  {
+        try {
+          val it = labelMapping.values().iterator()
+          while(it.hasNext()) {
+            it.next().toDouble
+          }
+          udf(mapLabelDouble _)
+        } catch {
+          case e: Exception => udf(mapLabelStr _)
+        }
+      }
+  }  
+  def updateLabels(isSingleNode:Boolean, df:DataFrame, X: MatrixBlock, 
labelColName:String, labelMapping: java.util.HashMap[Int, String]): DataFrame = 
{
+    if(isSingleNode) {
+      if(X.isInSparseFormat()) {
+        throw new RuntimeException("Since predicted label is a column vector, 
expected it to be in dense format")
+      }
+      for(i <- 0 until X.getNumRows) {
+        val v:Int = X.getValue(i, 0).toInt
+        if(labelMapping.containsKey(v)) {
+          X.setValue(i, 0, labelMapping.get(v).toDouble)
+        }
+        else {
+          throw new RuntimeException("No mapping found for " + v + " in " + 
labelMapping.toString())
+        }
+      }
+      return null
+    }
+    else {
+      val serObj = new LabelMappingData(labelMapping)
+      return df.withColumn(labelColName, serObj.mapLabel_udf(df(labelColName)))
+               .withColumnRenamed(labelColName, "prediction")
+    }
+  }
+  
+  def joinUsingID(df1:DataFrame, df2:DataFrame):DataFrame = {
+    val tempDF1 = df1.withColumnRenamed("ID", "ID1")
+    tempDF1.join(df2, tempDF1.col("ID1").equalTo(df2.col("ID"))).drop("ID1")
+  }
+  
+  def computePredictedClassLabelsFromProbability(mlscoreoutput:MLResults, 
isSingleNode:Boolean, sc:SparkContext, inProbVar:String): MLResults = {
+    val ml = new org.apache.sysml.api.mlcontext.MLContext(sc)
+    val script = dml(
+        """
+        Prob = read("temp1");
+        Prediction = rowIndexMax(Prob); # assuming one-based label mapping
+        write(Prediction, "tempOut", "csv");
+        """).out("Prediction")
+    val probVar = mlscoreoutput.getBinaryBlockMatrix(inProbVar)
+    if(isSingleNode) {
+      ml.execute(script.in("Prob", probVar.getMatrixBlock, 
probVar.getMatrixMetadata))
+    }
+    else {
+      ml.execute(script.in("Prob", probVar))
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f02f7c01/src/main/scala/org/apache/sysml/api/ml/SVM.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/SVM.scala 
b/src/main/scala/org/apache/sysml/api/ml/SVM.scala
new file mode 100644
index 0000000..07a7283
--- /dev/null
+++ b/src/main/scala/org/apache/sysml/api/ml/SVM.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.api.ml
+
+import org.apache.spark.rdd.RDD
+import java.io.File
+import org.apache.spark.SparkContext
+import org.apache.spark.ml.{ Model, Estimator }
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam }
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics
+import org.apache.sysml.runtime.matrix.data.MatrixBlock
+import org.apache.sysml.runtime.DMLRuntimeException
+import org.apache.sysml.runtime.instructions.spark.utils.{ 
RDDConverterUtilsExt => RDDConverterUtils }
+import org.apache.sysml.api.mlcontext._
+import org.apache.sysml.api.mlcontext.ScriptFactory._
+
+object SVM {
+  final val scriptPathBinary = "scripts" + File.separator + "algorithms" + 
File.separator + "l2-svm.dml"
+  final val scriptPathMulticlass = "scripts" + File.separator + "algorithms" + 
File.separator + "m-svm.dml"
+}
+
+class SVM (override val uid: String, val sc: SparkContext, val 
isMultiClass:Boolean=false) extends Estimator[SVMModel] with HasIcpt
+    with HasRegParam with HasTol with HasMaxOuterIter with 
BaseSystemMLClassifier {
+
+  def setIcpt(value: Int) = set(icpt, value)
+  def setMaxIter(value: Int) = set(maxOuterIter, value)
+  def setRegParam(value: Double) = set(regParam, value)
+  def setTol(value: Double) = set(tol, value)
+  
+  override def copy(extra: ParamMap): Estimator[SVMModel] = {
+    val that = new SVM(uid, sc, isMultiClass)
+    copyValues(that, extra)
+  }
+  
+  def getTrainingScript(isSingleNode:Boolean):(Script, String, String)  = {
+    val script = dml(ScriptsUtils.getDMLScript(if(isMultiClass) 
SVM.scriptPathMulticlass else SVM.scriptPathBinary))
+      .in("$X", " ")
+      .in("$Y", " ")
+      .in("$model", " ")
+      .in("$Log", " ")
+      .in("$icpt", toDouble(getIcpt))
+      .in("$reg", toDouble(getRegParam))
+      .in("$tol", toDouble(getTol))
+      .in("$maxiter", toDouble(getMaxOuterIte))
+      .out("w")
+    (script, "X", "Y")
+  }
+  
+  // Note: will update the y_mb as this will be called by Python mllearn
+  def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): SVMModel = {
+    val ret = fit(X_mb, y_mb, sc)
+    new SVMModel("svm")(ret._1, sc, isMultiClass, ret._2)
+  }
+  
+  def fit(df: ScriptsUtils.SparkDataType): SVMModel = {
+    val ret = fit(df, sc)
+    new SVMModel("svm")(ret._1, sc, isMultiClass, ret._2)
+  }
+  
+}
+
+object SVMModel {
+  final val predictionScriptPathBinary = "scripts" + File.separator + 
"algorithms" + File.separator + "l2-svm-predict.dml"
+  final val predictionScriptPathMulticlass = "scripts" + File.separator + 
"algorithms" + File.separator + "m-svm-predict.dml"
+}
+
+class SVMModel (override val uid: String)(val mloutput: MLResults, val sc: 
SparkContext, val isMultiClass:Boolean, 
+    val labelMapping: java.util.HashMap[Int, String]) extends Model[SVMModel] 
with BaseSystemMLClassifierModel {
+  override def copy(extra: ParamMap): SVMModel = {
+    val that = new SVMModel(uid)(mloutput, sc, isMultiClass, labelMapping)
+    copyValues(that, extra)
+  }
+  
+  def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, 
String)  = {
+    val script = dml(ScriptsUtils.getDMLScript(if(isMultiClass) 
SVMModel.predictionScriptPathMulticlass else 
SVMModel.predictionScriptPathBinary))
+      .in("$X", " ")
+      .in("$model", " ")
+      .out("scores")
+    
+    val w = mloutput.getBinaryBlockMatrix("w")
+    val wVar = if(isMultiClass) "W" else "w"
+      
+    val ret = if(isSingleNode) {
+      script.in(wVar, w.getMatrixBlock, w.getMatrixMetadata)
+    }
+    else {
+      script.in(wVar, w)
+    }
+    (ret, "X")
+  }
+  
+  def transform(X: MatrixBlock): MatrixBlock = transform(X, mloutput, 
labelMapping, sc, "scores")
+  def transform(df: ScriptsUtils.SparkDataType): DataFrame = transform(df, 
mloutput, labelMapping, sc, "scores")
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f02f7c01/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala 
b/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala
index fdf682d..10f9d33 100644
--- a/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala
@@ -26,6 +26,8 @@ import org.apache.sysml.runtime.DMLRuntimeException
 
 object ScriptsUtils {
   var systemmlHome = System.getenv("SYSTEMML_HOME")
+                 
+  type SparkDataType = org.apache.spark.sql.DataFrame // 
org.apache.spark.sql.Dataset[_]
 
   /**
    * set SystemML home

Reply via email to