Repository: incubator-systemml
Updated Branches:
  refs/heads/master d36a0c1b0 -> d69f3441c


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/python/systemml/mllearn/estimators.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/mllearn/estimators.py 
b/src/main/python/systemml/mllearn/estimators.py
index deed4c2..ec225c4 100644
--- a/src/main/python/systemml/mllearn/estimators.py
+++ b/src/main/python/systemml/mllearn/estimators.py
@@ -276,15 +276,22 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator):
     def decode(self, y):
         if self.le is not None:
             return self.le.inverse_transform(np.asarray(y - 1, dtype=int))
-        else:
+        elif self.labelMap is not None:
             return [ self.labelMap[int(i)] for i in y ]
+        else:
+            return y
         
     def predict(self, X):
-        predictions = np.asarray(super(BaseSystemMLClassifier, 
self).predict(X))
-        try:
-            return np.asarray(predictions, dtype='double')
-        except ValueError:
-            return np.asarray(predictions, dtype='str')
+        predictions = super(BaseSystemMLClassifier, self).predict(X)
+        from pyspark.sql.dataframe import DataFrame as df
+        if type(predictions) == df:
+            return predictions
+        else:
+            try:
+                return np.asarray(predictions, dtype='double')
+            except ValueError:
+                print(type(predictions))
+                return np.asarray(predictions, dtype='str')
             
     def score(self, X, y):
         """
@@ -300,6 +307,55 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator):
             return accuracy_score(y, predictions)
         else:
             return accuracy_score(np.asarray(y, dtype='str'), 
np.asarray(predictions, dtype='str'))
+            
+    def loadLabels(self, file_path):
+        createJavaObject(self.sc, 'dummy')
+        utilObj = self.sc._jvm.org.apache.sysml.api.ml.Utils()
+        if utilObj.checkIfFileExists(file_path):
+            df = self.sparkSession.read.csv(file_path, header=False).toPandas()
+            keys = np.asarray(df._c0, dtype='int')
+            values = np.asarray(df._c1, dtype='str')
+            self.labelMap = {}
+            self.le = None
+            for i in range(len(keys)):
+                self.labelMap[int(keys[i])] = values[i]
+            # self.encode(classes) # Giving incorrect results
+        
+    def load(self, weights=None, sep='/'):
+        """
+        Load a pretrained model. 
+
+        Parameters
+        ----------
+        weights: directory whether learned weights are stored (default: None)
+        sep: seperator to use (default: '/')
+        """
+        self.weights = weights
+        self.model.load(self.sc._jsc, weights, sep)
+        self.loadLabels(weights + '/labels.txt')
+        
+    def save(self, outputDir, format='binary', sep='/'):
+        """
+        Save a trained model.
+        
+        Parameters
+        ----------
+        outputDir: Directory to save the model to
+        format: optional format (default: 'binary')
+        sep: seperator to use (default: '/')
+        """
+        if self.model != None:
+            self.model.save(self.sc._jsc, outputDir, format, sep)
+            if self.le is not None:
+                labelMapping = dict(enumerate(list(self.le.classes_), 1))
+            else:
+                labelMapping = self.labelMap
+            lStr = [ [ int(k), str(labelMapping[k]) ] for k in labelMapping ]
+            df = self.sparkSession.createDataFrame(lStr)
+            df.write.csv(outputDir + sep + 'labels.txt', mode='overwrite', 
header=False)
+        else:
+            raise Exception('Cannot save as you need to train the model first 
using fit')
+        return self
 
 class BaseSystemMLRegressor(BaseSystemMLEstimator):
 
@@ -319,6 +375,34 @@ class BaseSystemMLRegressor(BaseSystemMLEstimator):
         y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix
         """
         return r2_score(y, self.predict(X), multioutput='variance_weighted')
+        
+    def load(self, weights=None, sep='/'):
+        """
+        Load a pretrained model. 
+
+        Parameters
+        ----------
+        weights: directory whether learned weights are stored (default: None)
+        sep: seperator to use (default: '/')
+        """
+        self.weights = weights
+        self.model.load(self.sc._jsc, weights, sep)
+
+    def save(self, outputDir, format='binary', sep='/'):
+        """
+        Save a trained model.
+        
+        Parameters
+        ----------
+        outputDir: Directory to save the model to
+        format: optional format (default: 'binary')
+        sep: seperator to use (default: '/')
+        """
+        if self.model != None:
+            self.model.save(outputDir, format, sep)
+        else:
+            raise Exception('Cannot save as you need to train the model first 
using fit')
+        return self
 
 
 class LogisticRegression(BaseSystemMLClassifier):
@@ -411,11 +495,12 @@ class LogisticRegression(BaseSystemMLClassifier):
         self.estimator.setIcpt(icpt)
         self.transferUsingDF = transferUsingDF
         self.setOutputRawPredictionsToFalse = True
+        self.model = 
self.sc._jvm.org.apache.sysml.api.ml.LogisticRegressionModel(self.estimator)
         if penalty != 'l2':
             raise Exception('Only l2 penalty is supported')
         if solver != 'newton-cg':
             raise Exception('Only newton-cg solver supported')
-
+        
 
 class LinearRegression(BaseSystemMLRegressor):
     """
@@ -481,6 +566,7 @@ class LinearRegression(BaseSystemMLRegressor):
         self.estimator.setIcpt(icpt)
         self.transferUsingDF = transferUsingDF
         self.setOutputRawPredictionsToFalse = False
+        self.model = 
self.sc._jvm.org.apache.sysml.api.ml.LinearRegressionModel(self.estimator)
 
 
 class SVM(BaseSystemMLClassifier):
@@ -526,6 +612,7 @@ class SVM(BaseSystemMLClassifier):
         self.sc = sparkSession._sc
         self.uid = "svm"
         createJavaObject(self.sc, 'dummy')
+        self.is_multi_class = is_multi_class
         self.estimator = self.sc._jvm.org.apache.sysml.api.ml.SVM(self.uid, 
self.sc._jsc.sc(), is_multi_class)
         self.estimator.setMaxIter(max_iter)
         if C <= 0:
@@ -537,7 +624,7 @@ class SVM(BaseSystemMLClassifier):
         self.estimator.setIcpt(icpt)
         self.transferUsingDF = transferUsingDF
         self.setOutputRawPredictionsToFalse = False
-
+        self.model = 
self.sc._jvm.org.apache.sysml.api.ml.SVMModel(self.estimator, 
self.is_multi_class)
 
 class NaiveBayes(BaseSystemMLClassifier):
     """
@@ -583,6 +670,7 @@ class NaiveBayes(BaseSystemMLClassifier):
         self.estimator.setLaplace(laplace)
         self.transferUsingDF = transferUsingDF
         self.setOutputRawPredictionsToFalse = False
+        self.model = 
self.sc._jvm.org.apache.sysml.api.ml.NaiveBayesModel(self.estimator)
 
 class Caffe2DML(BaseSystemMLClassifier):
     """
@@ -592,8 +680,6 @@ class Caffe2DML(BaseSystemMLClassifier):
     --------
     
     >>> from systemml.mllearn import Caffe2DML
-    >>> from pyspark.sql import SQLContext
-    >>> sqlCtx = SQLContext(sc)
     >>> from mlxtend.data import mnist_data
     >>> import numpy as np
     >>> from sklearn.utils import shuffle
@@ -603,25 +689,23 @@ class Caffe2DML(BaseSystemMLClassifier):
     >>> import urllib
     >>> 
urllib.urlretrieve('https://raw.githubusercontent.com/niketanpansare/model_zoo/master/caffe/vision/lenet/mnist/lenet.proto',
 'lenet.proto')
     >>> 
urllib.urlretrieve('https://raw.githubusercontent.com/niketanpansare/model_zoo/master/caffe/vision/lenet/mnist/lenet_solver.proto',
 'lenet_solver.proto')
-    >>> caffe2DML = Caffe2DML(sqlCtx, 'lenet_solver.proto').set(max_iter=500)
+    >>> caffe2DML = Caffe2DML(spark, 'lenet_solver.proto').set(max_iter=500)
     >>> caffe2DML.fit(X, y)
     """
-    def __init__(self, sqlCtx, solver, input_shape, weights=None, 
ignore_weights=None, transferUsingDF=False, tensorboard_log_dir=None):
+    def __init__(self, sparkSession, solver, input_shape, 
transferUsingDF=False, tensorboard_log_dir=None):
         """
         Performs training/prediction for a given caffe network. 
 
         Parameters
         ----------
-        sqlCtx: PySpark SQLContext
+        sparkSession: PySpark SparkSession
         solver: caffe solver file path
         input_shape: 3-element list (number of channels, input height, input 
width)
-        weights: directory whether learned weights are stored (default: None)
-        ignore_weights: names of layers to not read from the weights directory 
(list of string, default:None)
         transferUsingDF: whether to pass the input dataset via PySpark 
DataFrame (default: False)
         tensorboard_log_dir: directory to store the event logs (default: None, 
we use a temporary directory)
         """
-        self.sqlCtx = sqlCtx
-        self.sc = sqlCtx._sc
+        self.sparkSession = sparkSession
+        self.sc = sparkSession._sc
         createJavaObject(self.sc, 'dummy')
         self.uid = "Caffe2DML"
         self.model = None
@@ -629,30 +713,30 @@ class Caffe2DML(BaseSystemMLClassifier):
             raise ValueError('Expected input_shape as list of 3 element')
         solver = 
self.sc._jvm.org.apache.sysml.api.dl.Utils.readCaffeSolver(solver)
         self.estimator = 
self.sc._jvm.org.apache.sysml.api.dl.Caffe2DML(self.sc._jsc.sc(), solver, 
str(input_shape[0]), str(input_shape[1]), str(input_shape[2]))
-        self.weights = weights
-        if weights is not None:
-            self.estimator.setInput("$weights", str(weights))
-            self._loadLabelTxt()
-            if ignore_weights is not None:
-                self.estimator.setWeightsToIgnore(ignore_weights)
         self.transferUsingDF = transferUsingDF
         self.setOutputRawPredictionsToFalse = False
         self.visualize_called = False
         if tensorboard_log_dir is not None:
             self.estimator.setTensorBoardLogDir(tensorboard_log_dir)
-    
-    def _loadLabelTxt(self, format="binary", sep="/"):
-        if(self.weights is not None):
-            self.model = 
self.sc._jvm.org.apache.sysml.api.dl.Caffe2DMLModel(self.estimator)
-            df = self.sqlCtx.read.csv(self.weights + sep + 'labels.txt', 
header=False).toPandas()
-            keys = np.asarray(df._c0, dtype='int')
-            values = np.asarray(df._c1, dtype='str')
-            self.labelMap = {}
-            self.le = None
-            for i in range(len(keys)):
-                self.labelMap[int(keys[i])] = values[i]
-            # self.encode(classes) # Giving incorrect results
-    
+
+    def load(self, weights=None, sep='/', ignore_weights=None):
+        """
+        Load a pretrained model. 
+
+        Parameters
+        ----------
+        weights: directory whether learned weights are stored (default: None)
+        sep: seperator to use (default: '/')
+        ignore_weights: names of layers to not read from the weights directory 
(list of string, default:None)
+        """
+        self.weights = weights
+        self.estimator.setInput("$weights", str(weights))
+        self.model = 
self.sc._jvm.org.apache.sysml.api.dl.Caffe2DMLModel(self.estimator)
+        self.model.load(self.sc._jsc, weights, sep)
+        self.loadLabels(weights + '/labels.txt')
+        if ignore_weights is not None:
+            self.estimator.setWeightsToIgnore(ignore_weights)
+            
     def set(self, num_classes=None, debug=None):
         """
         Set input to Caffe2DML
@@ -691,25 +775,4 @@ class Caffe2DML(BaseSystemMLClassifier):
         self.visualize_called = True
         return self
     
-    def save(self, outputDir, format='binary', sep='/'):
-        """
-        Save a trained model.
-        
-        Parameters
-        ----------
-        outputDir: Directory to save the model to
-        format: optional format (default: 'binary')
-        sep: seperator to use (default: '/')
-        """
-        if self.model != None:
-            self.model.save(outputDir, format, sep)
-            if self.le is not None:
-                labelMapping = dict(enumerate(list(self.le.classes_), 1))
-            else:
-                labelMapping = self.labelMap
-            lStr = [ [ int(k), str(labelMapping[k]) ] for k in labelMapping ]
-            df = self.sqlCtx.createDataFrame(lStr)
-            df.write.csv(outputDir + sep + 'labels.txt', mode='overwrite', 
header=False)
-        else:
-            raise Exception('Cannot save as you need to train the model first 
using fit')
-        return self
+    

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala 
b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
index fe6b159..7fb3e17 100644
--- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
+++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
@@ -55,15 +55,35 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 
/***************************************************************************************
 DESIGN OF CAFFE2DML:
 
-1. Caffe2DML is designed to fit well into the mllearn framework. Hence, the 
key methods that needed to be implemented are:
+1. Caffe2DML is designed to fit well into the mllearn framework. Hence, the 
key methods that were to be implemented are:
 - `getTrainingScript` for the Estimator class. 
 - `getPredictionScript` for the Model class.
 
+These methods should be the starting point of any developer to understand the 
DML generated for training and prediction respectively.
+
 2. To simplify the DML generation in getTrainingScript and getPredictionScript 
method, we use DMLGenerator interface.
 This interface generates DML string for common operations such as loops (such 
as if, for, while) as well as built-in functions (read, write), etc.
 Also, this interface helps in "code reading" of this class :)
 
-3. Additionally, we created mapping classes for layer, solver and learning 
rate that maps the corresponding Caffe abstraction to the SystemML-NN library.
+3. Here is an analogy for SystemML developers to think of various moving 
components of Caffe2DML:
+- Like Dml.g4 in the org.apache.sysml.parser.dml package, caffe.proto in the 
src/main/proto/caffe directory
+is used to generate classes to parse the input files.
+
+Dml.g4      ---> antlr  ---> DmlLexer.java, DmlListener.java, DmlParser.java
+caffe.proto ---> protoc ---> target/generated-sources/caffe/Caffe.java
+
+- Just like the classes generated by Dml.g4 are used to parse input DML file,
+the target/generated-sources/caffe/Caffe.java class is used to parse the input 
caffe network/deploy prototxt and solver files.
+
+- You can think of .caffemodel file as DML file with matrix values encoded in 
it (please see below example). 
+So it is possible to read .caffemodel file with the Caffe.java class. This is 
done in Utils.scala's readCaffeNet method.
+
+X = matrix("1.2 3.5 0.999 7.123", rows=2, cols=2)
+...
+
+- Just like we convert the AST generated by antlr into our DMLProgram 
representation, we convert
+caffe's abstraction into the below given mapping classes for layer, solver and 
learning rate.
+These mapping classes maps the corresponding Caffe abstraction to the 
SystemML-NN library.
 This greatly simplifies adding new layers into Caffe2DML:
 trait CaffeLayer {
   // Any layer that wants to reuse SystemML-NN has to override following 
methods that help in generating the DML for the given layer:
@@ -87,6 +107,13 @@ trait Network {
   def getTopLayers(layerName:String): Set[String]
   def getLayerID(layerName:String): Int
 }
+
+5. One of the key design restriction of Caffe2DML is that every layer is 
identified uniquely by its name.
+This restriction simplifies the code significantly.
+To shield from network files that violates this restriction, Caffe2DML 
performs rewrites in CaffeNetwork class (search for condition 1-5).
+
+6. Caffe2DML also expects the layers to be in sorted order.
+
 
***************************************************************************************/
 
 object Caffe2DML  {
@@ -129,12 +156,12 @@ class Caffe2DML(val sc: SparkContext, val 
solverParam:Caffe.SolverParameter,
   }
   // Note: will update the y_mb as this will be called by Python mllearn
   def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): Caffe2DMLModel = {
-    val ret = baseFit(X_mb, y_mb, sc)
-    new Caffe2DMLModel(ret, Utils.numClasses(net), sc, solver, net, lrPolicy, 
this)
+    mloutput = baseFit(X_mb, y_mb, sc)
+    new Caffe2DMLModel(this)
   }
   def fit(df: ScriptsUtils.SparkDataType): Caffe2DMLModel = {
-    val ret = baseFit(df, sc)
-    new Caffe2DMLModel(ret, Utils.numClasses(net), sc, solver, net, lrPolicy, 
this)
+    mloutput = baseFit(df, sc)
+    new Caffe2DMLModel(this)
   }
        // --------------------------------------------------------------
   
@@ -412,23 +439,14 @@ class Caffe2DMLModel(val mloutput: MLResults,
   }
   // --------------------------------------------------------------
   
-  def save(outputDir:String, format:String="binary", sep:String="/"):Unit = {
-         if(mloutput == null) throw new DMLRuntimeException("Cannot save as 
you need to train the model first using fit")
-         val dmlScript = new StringBuilder
-         dmlScript.append("print(\"Saving the model to " + outputDir + 
"...\")\n")
-         net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != 
null).map(l => dmlScript.append(write(l.weight, outputDir + sep + 
l.param.getName + "_weight.mtx", format)))
-         net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(l 
=> dmlScript.append(write(l.bias, outputDir + sep + l.param.getName + 
"_bias.mtx", format)))
-         
-         val script = dml(dmlScript.toString)
-         net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != 
null).map(l => script.in(l.weight, mloutput.getBinaryBlockMatrix(l.weight)))
-         net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(l 
=> script.in(l.bias, mloutput.getBinaryBlockMatrix(l.bias)))
-         val ml = new MLContext(sc)
-         ml.execute(script)
-       }
+  def modelVariables():List[String] = {
+    net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != 
null).map(_.weight) ++
+    net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(_.bias)
+  }
     
   // 
================================================================================================
   // The below method parses the provided network and solver file and 
generates DML script.
-  def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, 
String)  = {
+  def getPredictionScript(isSingleNode:Boolean): (Script, String)  = {
     val startPredictionTime = System.nanoTime()
     
          reset                                  // Reset the state of DML 
generator for training script.
@@ -496,11 +514,13 @@ class Caffe2DMLModel(val mloutput: MLResults,
   }
   // 
================================================================================================
   
+  def baseEstimator():BaseSystemMLEstimator = estimator
+  
   // Prediction
   def transform(X: MatrixBlock): MatrixBlock = {
-         baseTransform(X, mloutput, sc, "Prob")
+         baseTransform(X, sc, "Prob")
   }
   def transform(df: ScriptsUtils.SparkDataType): DataFrame = {
-         baseTransform(df, mloutput, sc, "Prob")
+         baseTransform(df, sc, "Prob")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala 
b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala
index 0d1740e..3fdbdb1 100644
--- a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala
+++ b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala
@@ -88,7 +88,9 @@ trait CaffeLayer extends BaseDMLGenerator {
   def dWeight():String = throw new DMLRuntimeException("dWeight is not 
implemented in super class")
   def dBias():String = throw new DMLRuntimeException("dBias is not implemented 
in super class")
   def weight():String = null;
+  def weightShape():Array[Int];
   def bias():String = null;
+  def biasShape():Array[Int];
   def shouldUpdateWeight():Boolean = if(weight != null) true else false
   def shouldUpdateBias():Boolean = if(bias != null) true else false
   // 
--------------------------------------------------------------------------------------
@@ -136,13 +138,13 @@ trait IsLossLayer extends CaffeLayer {
 }
 
 trait HasWeight extends CaffeLayer {
-  override def weight = "W" + id
-  override def dWeight = "dW" + id
+  override def weight = param.getName + "_weight"
+  override def dWeight = param.getName + "_dWeight"
 }
 
 trait HasBias extends CaffeLayer {
-  override def bias = "b" + id
-  override def dBias = "db" + id
+  override def bias = param.getName + "_bias"
+  override def dBias = param.getName + "_dBias"
 }
 
 class Data(val param:LayerParameter, val id:Int, val net:CaffeNetwork, val 
numChannels:String, val height:String, val width:String) extends CaffeLayer {
@@ -152,13 +154,21 @@ class Data(val param:LayerParameter, val id:Int, val 
net:CaffeNetwork, val numCh
     if(param.hasTransformParam && param.getTransformParam.hasScale) {
       dmlScript.append("X_full = X_full * " + param.getTransformParam.getScale 
+ "\n")
     }
-    dmlScript.append("BATCH_SIZE = " + param.getDataParam.getBatchSize + "\n")
+    if(param.hasDataParam && param.getDataParam.hasBatchSize) {
+      dmlScript.append("BATCH_SIZE = " + param.getDataParam.getBatchSize + 
"\n")
+    }
+    else {
+      Caffe2DML.LOG.debug("Using default batch size of 64 as batch size is not 
set with DataParam")
+      dmlScript.append("BATCH_SIZE = 64\n")
+    }
   }
   var dataOutputShape = ("$num_channels", "$height", "$width")
   override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = { }
   override def out = "Xb"
   override def backward(dmlScript:StringBuilder, outSuffix:String) = { }
   override def outputShape = (numChannels, height, width)
+  override def weightShape():Array[Int] = null
+  override def biasShape():Array[Int] = null
   // -------------------------------------------------
 }
 
@@ -303,6 +313,8 @@ class BatchNorm(val param:LayerParameter, val id:Int, val 
net:CaffeNetwork) exte
   
   private def withSuffix(str:String):String = if(update_mean_var) str else str 
+ "_ignore"
   override def weight = "ema_mean" + id
+  override def weightShape():Array[Int] = Array(numChannels.toInt, 1)
+  override def biasShape():Array[Int] = Array(numChannels.toInt, 1)
   override def bias = "ema_var" + id
   def cache_mean(): String = "cache_mean" + id
   def cache_var():String = "cache_mean" + id
@@ -337,6 +349,8 @@ class Scale(val param:LayerParameter, val id:Int, val 
net:CaffeNetwork) extends
   // TODO: Generalize this !!
   def forward(dmlScript: StringBuilder, isPrediction: Boolean): Unit = 
assign(dmlScript, out, X)
   override def backward(dmlScript: StringBuilder, outSuffix:String): Unit = 
assignDoutToDX(dmlScript, outSuffix)
+  override def weightShape():Array[Int] = 
Array(bottomLayerOutputShape._1.toInt, 1)
+  override def biasShape():Array[Int] = Array(bottomLayerOutputShape._1.toInt, 
1)
 }
 // ------------------------------------------------------------------
 
@@ -354,7 +368,8 @@ class Elementwise(val param:LayerParameter, val id:Int, val 
net:CaffeNetwork) ex
     _out
   }
   var _out:(String, String, String) = null
-  
+  override def weightShape():Array[Int] = null
+  override def biasShape():Array[Int] = null
 }
 
 class Concat(val param:LayerParameter, val id:Int, val net:CaffeNetwork) 
extends CaffeLayer {
@@ -466,6 +481,8 @@ class Concat(val param:LayerParameter, val id:Int, val 
net:CaffeNetwork) extends
     _out
   }
   var _out:(String, String, String) = null
+  override def weightShape():Array[Int] = null
+  override def biasShape():Array[Int] = null
 }
 
 class SoftmaxWithLoss(val param:LayerParameter, val id:Int, val 
net:CaffeNetwork) extends CaffeLayer with IsLossLayer {
@@ -506,6 +523,8 @@ class SoftmaxWithLoss(val param:LayerParameter, val id:Int, 
val net:CaffeNetwork
          else 
                  throw new LanguageException("More than 2 bottom layers is not 
supported")
   }
+  override def weightShape():Array[Int] = null
+  override def biasShape():Array[Int] = null
   // -------------------------------------------------
 }
 
@@ -540,9 +559,72 @@ class ReLU(val param:LayerParameter, val id:Int, val 
net:CaffeNetwork) extends C
    *  - dX: Gradient wrt `X`, of same shape as `X`.
    */
   override def backward(dmlScript:StringBuilder, outSuffix:String) = 
invokeBackward(dmlScript, outSuffix, List[String]("dOut" + id), dout, X)
+  override def weightShape():Array[Int] = null
+  override def biasShape():Array[Int] = null
+  // -------------------------------------------------
+}
+
+class Softmax(val param:LayerParameter, val id:Int, val net:CaffeNetwork) 
extends CaffeLayer {
+  // -------------------------------------------------
+  override def sourceFileName = "softmax"
+  override def init(dmlScript:StringBuilder) = { }
+  /*
+   * Computes the forward pass for a softmax classifier.  The inputs
+   * are interpreted as unnormalized, log-probabilities for each of
+   * N examples, and the softmax function transforms them to normalized
+   * probabilities.
+   *
+   * This can be interpreted as a generalization of the sigmoid
+   * function to multiple classes.
+   *
+   *   `probs_ij = e^scores_ij / sum(e^scores_i)`
+   *
+   * Inputs:
+   *  - scores: Inputs, of shape (N, D).
+   *
+   * Outputs:
+   *  - probs: Outputs, of shape (N, D).
+   */
+  override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = 
invokeForward(dmlScript, List[String](out), X)
+  /*
+   * Computes the backward pass for a softmax classifier.
+   *
+   * Note that dscores_ij has multiple source branches:
+   *
+   *   ```
+   *   dprobs_ij/dscores_ij = probs_ij * (1 - probs_ij)
+   *   dprobs_ik/dscores_ij = -probs_ik * probs_ij, for all k != j
+   *
+   *   dloss/dscores_ij =
+   *      (dloss/dprobs_ij * dprobs_ij/dscores_ij)
+   *      + sum_{k!=j}(dloss/dprobs_ik * dprobs_ik/dscores_ij)
+   *   ```
+   *
+   * Inputs:
+   *  - dprobs: Gradient wrt `probs` from upstream, of shape (N, D).
+   *  - scores: Inputs, of shape (N, D).
+   *
+   * Outputs:
+   *  - dscores: Gradient wrt `scores`, of shape (N, D).
+   */
+  override def backward(dmlScript:StringBuilder, outSuffix:String) = 
invokeBackward(dmlScript, outSuffix, List[String]("dOut" + id), dout, X)
+  override def weightShape():Array[Int] = null
+  override def biasShape():Array[Int] = null
   // -------------------------------------------------
 }
 
+
+class Threshold(val param:LayerParameter, val id:Int, val net:CaffeNetwork) 
extends CaffeLayer {
+  override def sourceFileName = null
+  override def init(dmlScript:StringBuilder) = { }
+  val threshold = if(param.getThresholdParam.hasThreshold) 
param.getThresholdParam.getThreshold else 0
+  override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = 
assign(dmlScript, out, X + " > " + threshold)
+  override def backward(dmlScript:StringBuilder, outSuffix:String) = throw new 
DMLRuntimeException("Backward operation for Threshold layer is not supported.")
+  override def weightShape():Array[Int] = null
+  override def biasShape():Array[Int] = null
+}
+
+
 class Dropout(val param:LayerParameter, val id:Int, val net:CaffeNetwork) 
extends CaffeLayer {
   // -------------------------------------------------
   override def sourceFileName = "dropout"
@@ -591,6 +673,8 @@ class Dropout(val param:LayerParameter, val id:Int, val 
net:CaffeNetwork) extend
   // dropout ratio
   def p = if(param.getDropoutParam.hasDropoutRatio()) 
param.getDropoutParam.getDropoutRatio.toString else "0.5"
   def seed = "-1"
+  override def weightShape():Array[Int] = null
+  override def biasShape():Array[Int] = null
 }
 
 class InnerProduct(val param:LayerParameter, val id:Int, val net:CaffeNetwork) 
extends CaffeLayer with HasWeight with HasBias {
@@ -656,8 +740,11 @@ class InnerProduct(val param:LayerParameter, val id:Int, 
val net:CaffeNetwork) e
   def numFeatures = int_mult(bottomLayerOutputShape._1, 
bottomLayerOutputShape._2, bottomLayerOutputShape._3)
   // n * c_o * 1 * 1
   override def outputShape = ( 
param.getInnerProductParam.getNumOutput.toString, "1", "1" )
+  override def weightShape():Array[Int] = Array(numFeatures.toInt, 
numNeurons.toInt)
+  override def biasShape():Array[Int] = Array(1, numNeurons.toInt)
 }
 
+
 class MaxPooling(val param:LayerParameter, val id:Int, val net:CaffeNetwork) 
extends CaffeLayer {
   // -------------------------------------------------
   override def sourceFileName = "max_pool2d_builtin"
@@ -748,6 +835,8 @@ class MaxPooling(val param:LayerParameter, val id:Int, val 
net:CaffeNetwork) ext
   def pad_w =   if(poolingParam.hasPadW) poolingParam.getPadW.toString 
                    else if(poolingParam.hasPad) poolingParam.getPad.toString
                    else "0"
+  override def weightShape():Array[Int] = null
+  override def biasShape():Array[Int] = null
 }
 
 class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) 
extends CaffeLayer with HasWeight with HasBias {
@@ -861,6 +950,8 @@ class Convolution(val param:LayerParameter, val id:Int, val 
net:CaffeNetwork) ex
   def Wout =  ConvolutionUtils.getConv2dOutputMap(bottomLayerOutputShape._3, 
kernel_w, stride_w, pad_w)
   // -------------------------------------------------
   def convParam = param.getConvolutionParam
+  override def weightShape():Array[Int] = Array(numKernels.toInt, 
int_mult(numChannels, kernel_h, kernel_w).toInt)
+  override def biasShape():Array[Int] = Array(numKernels.toInt, 1)
   // num_output (c_o): the number of filters
   def numKernels = convParam.getNumOutput.toString
   // kernel_size (or kernel_h and kernel_w): specifies height and width of 
each filter
@@ -910,6 +1001,9 @@ class DeConvolution(val param:LayerParameter, val id:Int, 
val net:CaffeNetwork)
   override def init(dmlScript: StringBuilder): Unit = 
     invokeInit(dmlScript, List[String](weight, bias), numKernels, numChannels, 
kernel_h, kernel_w)
     
+  override def weightShape():Array[Int] = Array(numKernels.toInt, 
int_mult(numChannels, kernel_h, kernel_w).toInt)
+  override def biasShape():Array[Int] = Array(numKernels.toInt, 1)
+    
   /*
    * Computes the forward pass for a 2D spatial transpose convolutional
    * layer with F filters.  The input data has N examples, each
@@ -1017,4 +1111,4 @@ class DeConvolution(val param:LayerParameter, val id:Int, 
val net:CaffeNetwork)
   def pad_w =   if(convParam.hasPadW) convParam.getPadW.toString 
                    else if(convParam.getPadCount > 0)  
convParam.getPad(0).toString 
                    else "0"
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala 
b/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala
index c106cb7..5c2dc77 100644
--- a/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala
+++ b/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala
@@ -44,22 +44,50 @@ object CaffeNetwork {
 }
 
 class CaffeNetwork(netFilePath:String, val currentPhase:Phase, 
-     val numChannels:String, val height:String, val width:String
+     var numChannels:String, var height:String, var width:String
     ) extends Network {
   private def isIncludedInCurrentPhase(l:LayerParameter): Boolean = {
-    if(l.getIncludeCount == 0) true else l.getIncludeList.filter(r => 
r.hasPhase() && r.getPhase != currentPhase).length == 0
+    if(currentPhase == null) return true // while deployment
+    else if(l.getIncludeCount == 0) true 
+    else l.getIncludeList.filter(r => r.hasPhase() && r.getPhase != 
currentPhase).length == 0
   }
   private var id = 1
-  
+  def this(deployFilePath:String) {
+    this(deployFilePath, null, null, null, null)
+  }
   // 
--------------------------------------------------------------------------------
-  private var _caffeLayerParams:List[LayerParameter] = 
Utils.readCaffeNet(netFilePath).getLayerList.filter(l => 
isIncludedInCurrentPhase(l)).toList
+  private var _net:NetParameter = Utils.readCaffeNet(netFilePath)
+  private var _caffeLayerParams:List[LayerParameter] = 
_net.getLayerList.filter(l => isIncludedInCurrentPhase(l)).toList
+  // This method is used if the user doesnot provide number of channels, 
height and width
+  private def setCHW(inputShapes:java.util.List[caffe.Caffe.BlobShape]):Unit = 
{
+    if(inputShapes.size != 1)
+        throw new DMLRuntimeException("Expected only one input shape")
+    val inputShape = inputShapes.get(0)
+    if(inputShape.getDimCount != 4)
+      throw new DMLRuntimeException("Expected the input shape of dimension 4")
+    numChannels = inputShape.getDim(1).toString
+    height = inputShape.getDim(2).toString
+    width = inputShape.getDim(3).toString
+  }
+  if(numChannels == null && height == null && width == null) {
+    val inputLayer:List[LayerParameter] = 
_caffeLayerParams.filter(_.getType.toLowerCase.equals("input"))
+    if(inputLayer.size == 1) {
+      setCHW(inputLayer(0).getInputParam.getShapeList)
+    }
+    else if(inputLayer.size == 0) {
+      throw new DMLRuntimeException("Input shape (number of channels, height, 
width) is unknown. Hint: If you are using deprecated input/input_shape API, we 
recommend you use Input layer.")
+    }
+    else {
+      throw new DMLRuntimeException("Multiple Input layer is not supported")
+    }
+  }
   // 
--------------------------------------------------------------------------------
   
   private var _layerNames: List[String] = _caffeLayerParams.map(l => 
l.getName).toList
   CaffeNetwork.LOG.debug("Layers in current phase:" + _layerNames)
   
   // Condition 1: assert that each name is unique
-  private val _duplicateLayerNames =_layerNames.diff(_layerNames.distinct)
+  private val _duplicateLayerNames = _layerNames.diff(_layerNames.distinct)
   if(_duplicateLayerNames.size != 0) throw new LanguageException("Duplicate 
layer names is not supported:" + _duplicateLayerNames)
   
   // Condition 2: only 1 top name, except Data layer
@@ -126,12 +154,16 @@ class CaffeNetwork(netFilePath:String, val 
currentPhase:Phase,
     else l
   })
   
+  // Used while reading caffemodel
+  val replacedLayerNames = new HashMap[String, String]();
+  
   // Condition 5: Deal with incorrect naming
   // Example: layer { name: foo, bottom: arbitrary, top: bar } ... Rename the 
layer to bar
   private def isIncorrectNamingLayer(l:LayerParameter): Boolean = 
l.getTopCount == 1 && !l.getTop(0).equalsIgnoreCase(l.getName)
   _caffeLayerParams = _caffeLayerParams.map(l => {
     if(isIncorrectNamingLayer(l)) {
       val builder = l.toBuilder();
+      replacedLayerNames.put(l.getName, l.getTop(0))
       builder.setName(l.getTop(0))
       builder.build()
     }
@@ -161,7 +193,15 @@ class CaffeNetwork(netFilePath:String, val 
currentPhase:Phase,
   
   private def throwException(layerName:String) = throw new 
LanguageException("Layer with name " + layerName + " not found")                
              
   def getLayers(): List[String] =  _layerNames
-  def getCaffeLayer(layerName:String):CaffeLayer = if(checkKey(_layers, 
layerName)) _layers.get(layerName).get else throwException(layerName)
+  def getCaffeLayer(layerName:String):CaffeLayer = {
+    if(checkKey(_layers, layerName)) _layers.get(layerName).get
+    else {
+      if(replacedLayerNames.contains(layerName) && checkKey(_layers, 
replacedLayerNames.get(layerName))) {
+        _layers.get(replacedLayerNames.get(layerName)).get
+      }
+      else throwException(layerName)
+    }
+  }
   def getBottomLayers(layerName:String): Set[String] =  
if(checkKey(_bottomLayers, layerName)) _bottomLayers.get(layerName).get else 
throwException(layerName)
   def getTopLayers(layerName:String): Set[String] = if(checkKey(_topLayers, 
layerName)) _topLayers.get(layerName).get else throwException(layerName)
   def getLayerID(layerName:String): Int = if(checkKey(_layerIDs, layerName))  
_layerIDs.get(layerName).get else throwException(layerName)
@@ -183,11 +223,14 @@ class CaffeNetwork(netFilePath:String, val 
currentPhase:Phase,
       case "softmaxwithloss" => new SoftmaxWithLoss(param, id, this)
       case "dropout" => new Dropout(param, id, this)
       case "data" => new Data(param, id, this, numChannels, height, width)
+      case "input" => new Data(param, id, this, numChannels, height, width)
       case "batchnorm" => new BatchNorm(param, id, this)
       case "scale" => new Scale(param, id, this)
       case "eltwise" => new Elementwise(param, id, this)
       case "concat" => new Concat(param, id, this)
       case "deconvolution" => new DeConvolution(param, id, this)
+      case "threshold" => new Threshold(param, id, this)
+      case "softmax" => new Softmax(param, id, this)
       case _ => throw new LanguageException("Layer of type " + param.getType + 
" is not supported")
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/dl/Utils.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/dl/Utils.scala 
b/src/main/scala/org/apache/sysml/api/dl/Utils.scala
index 5181c9b..5c7222c 100644
--- a/src/main/scala/org/apache/sysml/api/dl/Utils.scala
+++ b/src/main/scala/org/apache/sysml/api/dl/Utils.scala
@@ -34,6 +34,11 @@ import java.io.InputStreamReader;
 import org.apache.sysml.runtime.DMLRuntimeException
 import java.io.StringReader
 import java.io.BufferedReader
+import com.google.protobuf.CodedInputStream
+import org.apache.sysml.runtime.matrix.data.MatrixBlock
+import org.apache.sysml.api.mlcontext.MLContext
+import org.apache.spark.SparkContext
+import org.apache.spark.api.java.JavaSparkContext
 
 object Utils {
   // 
---------------------------------------------------------------------------------------------
@@ -80,12 +85,144 @@ object Utils {
        // --------------------------------------------------------------
        // Caffe utility functions
        def readCaffeNet(netFilePath:String):NetParameter = {
+         // Load network
                val reader:InputStreamReader = 
getInputStreamReader(netFilePath); 
        val builder:NetParameter.Builder =  NetParameter.newBuilder();
        TextFormat.merge(reader, builder);
        return builder.build();
        }
        
+       class CopyFloatToDoubleArray(data:java.util.List[java.lang.Float], 
rows:Int, cols:Int, transpose:Boolean, arr:Array[Double]) extends Thread {
+         override def run(): Unit = {
+           if(transpose) {
+        var iter = 0
+        for(i <- 0 until cols) {
+          for(j <- 0 until rows) {
+            arr(j*cols + i) = data.get(iter).doubleValue()
+            iter += 1
+          }
+        }
+      }
+      else {
+        for(i <- 0 until data.size()) {
+          arr(i) = data.get(i).doubleValue()
+        }
+      }
+         }
+       }
+       
+       def allocateMatrixBlock(data:java.util.List[java.lang.Float], rows:Int, 
cols:Int, transpose:Boolean):(MatrixBlock,CopyFloatToDoubleArray) = {
+         val mb =  new MatrixBlock(rows, cols, false)
+    mb.allocateDenseBlock()
+    val arr = mb.getDenseBlock
+    val thread = new CopyFloatToDoubleArray(data, rows, cols, transpose, arr)
+         thread.start
+         return (mb, thread)
+       }
+       def validateShape(shape:Array[Int], 
data:java.util.List[java.lang.Float], layerName:String): Unit = {
+         if(shape == null) 
+      throw new DMLRuntimeException("Unexpected weight for layer: " + 
layerName)
+    else if(shape.length != 2) 
+      throw new DMLRuntimeException("Expected shape to be of length 2:" + 
layerName)
+    else if(shape(0)*shape(1) != data.size())
+      throw new DMLRuntimeException("Incorrect size of blob from caffemodel 
for the layer " + layerName + ". Expected of size " + shape(0)*shape(1) + ", 
but found " + data.size())
+       }
+       
+       def saveCaffeModelFile(sc:JavaSparkContext, deployFilePath:String, 
+           caffeModelFilePath:String, outputDirectory:String, 
format:String):Unit = {
+         saveCaffeModelFile(sc.sc, deployFilePath, caffeModelFilePath, 
outputDirectory, format)
+       }
+       
+       def saveCaffeModelFile(sc:SparkContext, deployFilePath:String, 
caffeModelFilePath:String, outputDirectory:String, format:String):Unit = {
+         val inputVariables = new java.util.HashMap[String, MatrixBlock]()
+         readCaffeNet(new CaffeNetwork(deployFilePath), deployFilePath, 
caffeModelFilePath, inputVariables)
+         val ml = new MLContext(sc)
+         val dmlScript = new StringBuilder
+         if(inputVariables.keys.size == 0)
+           throw new DMLRuntimeException("No weights found in the file " + 
caffeModelFilePath)
+         for(input <- inputVariables.keys) {
+           dmlScript.append("write(" + input + ", \"" + input + ".mtx\", 
format=\"" + format + "\");\n")
+         }
+         if(Caffe2DML.LOG.isDebugEnabled())
+           Caffe2DML.LOG.debug("Executing the script:" + dmlScript.toString)
+         val script = 
org.apache.sysml.api.mlcontext.ScriptFactory.dml(dmlScript.toString()).in(inputVariables)
+         ml.execute(script)
+       }
+       
+       def readCaffeNet(net:CaffeNetwork, netFilePath:String, 
weightsFilePath:String, inputVariables:java.util.HashMap[String, 
MatrixBlock]):NetParameter = {
+         // Load network
+               val reader:InputStreamReader = 
getInputStreamReader(netFilePath); 
+       val builder:NetParameter.Builder =  NetParameter.newBuilder();
+       TextFormat.merge(reader, builder);
+       // Load weights
+         val inputStream = CodedInputStream.newInstance(new 
FileInputStream(weightsFilePath))
+         inputStream.setSizeLimit(Integer.MAX_VALUE)
+         builder.mergeFrom(inputStream)
+         val net1 = builder.build();
+         
+         val asyncThreads = new java.util.ArrayList[CopyFloatToDoubleArray]()
+         for(layer <- net1.getLayerList) {
+           if(layer.getBlobsCount == 0) {
+             // No weight or bias
+             Caffe2DML.LOG.debug("The layer:" + layer.getName + " has no 
blobs")
+           }
+           else if(layer.getBlobsCount == 2) {
+             // Both weight and bias
+             val caffe2DMLLayer = net.getCaffeLayer(layer.getName)
+             val transpose = caffe2DMLLayer.isInstanceOf[InnerProduct]
+             
+             // weight
+             val data = layer.getBlobs(0).getDataList
+             val shape = caffe2DMLLayer.weightShape()
+             if(shape == null)
+               throw new DMLRuntimeException("Didnot expect weights for the 
layer " + layer.getName)
+             validateShape(shape, data, layer.getName)
+             val ret1 = allocateMatrixBlock(data, shape(0), shape(1), 
transpose)
+             asyncThreads.add(ret1._2)
+             inputVariables.put(caffe2DMLLayer.weight, ret1._1)
+             
+             // bias
+             val biasData = layer.getBlobs(1).getDataList
+             val biasShape = caffe2DMLLayer.biasShape()
+             if(biasShape == null)
+               throw new DMLRuntimeException("Didnot expect bias for the layer 
" + layer.getName)
+             validateShape(biasShape, biasData, layer.getName)
+             val ret2 = allocateMatrixBlock(biasData, biasShape(0), 
biasShape(1), transpose)
+             asyncThreads.add(ret2._2)
+             inputVariables.put(caffe2DMLLayer.bias, ret2._1)
+             Caffe2DML.LOG.debug("Read weights/bias for layer:" + 
layer.getName)
+           }
+           else if(layer.getBlobsCount == 1) {
+             // Special case: convolution/deconvolution without bias
+             // TODO: Extend nn layers to handle this situation + Generalize 
this to other layers, for example: InnerProduct
+             val caffe2DMLLayer = net.getCaffeLayer(layer.getName)
+             val convParam = if((caffe2DMLLayer.isInstanceOf[Convolution] || 
caffe2DMLLayer.isInstanceOf[DeConvolution]) && 
caffe2DMLLayer.param.hasConvolutionParam())  
caffe2DMLLayer.param.getConvolutionParam else null  
+             if(convParam == null)
+               throw new DMLRuntimeException("Layer with blob count " + 
layer.getBlobsCount + " is not supported for the layer " + layer.getName)
+            
+             val data = layer.getBlobs(0).getDataList
+             val shape = caffe2DMLLayer.weightShape()
+             validateShape(shape, data, layer.getName)
+             val ret1 = allocateMatrixBlock(data, shape(0), shape(1), false)
+             asyncThreads.add(ret1._2)
+             inputVariables.put(caffe2DMLLayer.weight, ret1._1)
+             inputVariables.put(caffe2DMLLayer.bias, new 
MatrixBlock(convParam.getNumOutput, 1, false))
+             Caffe2DML.LOG.debug("Read only weight for layer:" + layer.getName)
+           }
+           else {
+             throw new DMLRuntimeException("Layer with blob count " + 
layer.getBlobsCount + " is not supported for the layer " + layer.getName)
+           }
+         }
+         
+         // Wait for the copy to be finished
+         for(t <- asyncThreads) {
+           t.join()
+         }
+         
+         // Return the NetParameter without
+         return readCaffeNet(netFilePath)
+       }
+       
        def readCaffeSolver(solverFilePath:String):SolverParameter = {
                val reader = getInputStreamReader(solverFilePath);
                val builder =  SolverParameter.newBuilder();
@@ -112,4 +249,12 @@ object Utils {
                }
        }
        // --------------------------------------------------------------
+}
+
+class Utils {
+  def saveCaffeModelFile(sc:JavaSparkContext, deployFilePath:String, 
+           caffeModelFilePath:String, outputDirectory:String, 
format:String):Unit = {
+    Utils.saveCaffeModelFile(sc, deployFilePath, caffeModelFilePath, 
outputDirectory, format)
+  }
+  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
----------------------------------------------------------------------
diff --git 
a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala 
b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
index f0af799..e601a7d 100644
--- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
@@ -19,6 +19,7 @@
 
 package org.apache.sysml.api.ml
 
+import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.rdd.RDD
 import java.io.File
 import org.apache.spark.SparkContext
@@ -95,7 +96,7 @@ trait BaseSystemMLEstimatorOrModel {
 
 trait BaseSystemMLEstimator extends BaseSystemMLEstimatorOrModel {
   def transformSchema(schema: StructType): StructType = schema
-  
+  var mloutput:MLResults = null
   // Returns the script and variables for X and y
   def getTrainingScript(isSingleNode:Boolean):(Script, String, String)
   
@@ -120,7 +121,37 @@ trait BaseSystemMLEstimatorModel extends 
BaseSystemMLEstimatorOrModel {
   def transformSchema(schema: StructType): StructType = schema
   
   // Returns the script and variable for X
-  def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, 
String)
+  def getPredictionScript(isSingleNode:Boolean): (Script, String)
+  def baseEstimator():BaseSystemMLEstimator
+  def modelVariables():List[String]
+  // self.model.load(self.sc._jsc, weights, format, sep)
+  def load(sc:JavaSparkContext, outputDir:String, sep:String):Unit = {
+       val dmlScript = new StringBuilder
+       dmlScript.append("print(\"Loading the model from " + outputDir + 
"...\")\n")
+               for(varName <- modelVariables) {
+                       dmlScript.append(varName + " = read(\"" + outputDir + 
sep + varName + ".mtx\")\n")
+               }
+       val script = dml(dmlScript.toString)
+               for(varName <- modelVariables) {
+                       script.out(varName)
+               }
+         val ml = new MLContext(sc)
+         baseEstimator.mloutput = ml.execute(script)
+  }
+  def save(sc:JavaSparkContext, outputDir:String, format:String="binary", 
sep:String="/"):Unit = {
+         if(baseEstimator.mloutput == null) throw new 
DMLRuntimeException("Cannot save as you need to train the model first using 
fit")
+         val dmlScript = new StringBuilder
+         dmlScript.append("print(\"Saving the model to " + outputDir + 
"...\")\n")
+         for(varName <- modelVariables) {
+               dmlScript.append("write(" + varName + ", \"" + outputDir + sep 
+ varName + ".mtx\", format=\"" + format + "\")\n")
+         }
+         val script = dml(dmlScript.toString)
+               for(varName <- modelVariables) {
+                       script.in(varName, 
baseEstimator.mloutput.getBinaryBlockMatrix(varName))
+               }
+         val ml = new MLContext(sc)
+         ml.execute(script)
+       }
 }
 
 trait BaseSystemMLClassifier extends BaseSystemMLEstimator {
@@ -150,11 +181,11 @@ trait BaseSystemMLClassifier extends 
BaseSystemMLEstimator {
 
 trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel {
 
-  def baseTransform(X: MatrixBlock, mloutput: MLResults, sc: SparkContext, 
probVar:String): MatrixBlock = {
+  def baseTransform(X: MatrixBlock, sc: SparkContext, probVar:String): 
MatrixBlock = {
     val isSingleNode = true
     val ml = new MLContext(sc)
     updateML(ml)
-    val script = getPredictionScript(mloutput, isSingleNode)
+    val script = getPredictionScript(isSingleNode)
     // Uncomment for debugging
     // ml.setExplainLevel(ExplainLevel.RECOMPILE_RUNTIME)
     val modelPredict = ml.execute(script._1.in(script._2, X, new 
MatrixMetadata(X.getNumRows, X.getNumColumns, X.getNonZeros)))
@@ -167,14 +198,14 @@ trait BaseSystemMLClassifierModel extends 
BaseSystemMLEstimatorModel {
     return ret
   }
 
-  def baseTransform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, sc: 
SparkContext, 
+  def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, 
       probVar:String, outputProb:Boolean=true): DataFrame = {
     val isSingleNode = false
     val ml = new MLContext(sc)
     updateML(ml)
     val mcXin = new MatrixCharacteristics()
     val Xin = RDDConverterUtils.dataFrameToBinaryBlock(df.rdd.sparkContext, 
df.asInstanceOf[DataFrame].select("features"), mcXin, false, true)
-    val script = getPredictionScript(mloutput, isSingleNode)
+    val script = getPredictionScript(isSingleNode)
     val Xin_bin = new BinaryBlockMatrix(Xin, mcXin)
     val modelPredict = ml.execute(script._1.in(script._2, Xin_bin))
     val predLabelOut = 
PredictionUtils.computePredictedClassLabelsFromProbability(modelPredict, 
isSingleNode, sc, probVar)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala 
b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
index 5dd23e0..9e2a34a 100644
--- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
@@ -60,11 +60,11 @@ trait BaseSystemMLRegressor extends BaseSystemMLEstimator {
 
 trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel {
   
-  def baseTransform(X: MatrixBlock, mloutput: MLResults, sc: SparkContext, 
predictionVar:String): MatrixBlock = {
+  def baseTransform(X: MatrixBlock, sc: SparkContext, predictionVar:String): 
MatrixBlock = {
     val isSingleNode = true
     val ml = new MLContext(sc)
     updateML(ml)
-    val script = getPredictionScript(mloutput, isSingleNode)
+    val script = getPredictionScript(isSingleNode)
     val modelPredict = ml.execute(script._1.in(script._2, X))
     val ret = modelPredict.getBinaryBlockMatrix(predictionVar).getMatrixBlock
               
@@ -74,13 +74,13 @@ trait BaseSystemMLRegressorModel extends 
BaseSystemMLEstimatorModel {
     return ret
   }
   
-  def baseTransform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, sc: 
SparkContext, predictionVar:String): DataFrame = {
+  def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, 
predictionVar:String): DataFrame = {
     val isSingleNode = false
     val ml = new MLContext(sc)
     updateML(ml)
     val mcXin = new MatrixCharacteristics()
     val Xin = RDDConverterUtils.dataFrameToBinaryBlock(df.rdd.sparkContext, 
df.asInstanceOf[DataFrame], mcXin, false, true)
-    val script = getPredictionScript(mloutput, isSingleNode)
+    val script = getPredictionScript(isSingleNode)
     val Xin_bin = new BinaryBlockMatrix(Xin, mcXin)
     val modelPredict = ml.execute(script._1.in(script._2, Xin_bin))
     val predictedDF = 
modelPredict.getDataFrame(predictionVar).select(RDDConverterUtils.DF_ID_COLUMN, 
"C1").withColumnRenamed("C1", "prediction")

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala 
b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
index 76bc0a3..463d81a 100644
--- a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
@@ -48,6 +48,7 @@ class LinearRegression(override val uid: String, val sc: 
SparkContext, val solve
   def setRegParam(value: Double) = set(regParam, value)
   def setTol(value: Double) = set(tol, value)
   
+
   override def copy(extra: ParamMap): Estimator[LinearRegressionModel] = {
     val that = new LinearRegression(uid, sc, solver)
     copyValues(that, extra)
@@ -72,26 +73,38 @@ class LinearRegression(override val uid: String, val sc: 
SparkContext, val solve
     (script, "X", "y")
   }
   
-  def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LinearRegressionModel = 
-    new LinearRegressionModel("lr")(baseFit(X_mb, y_mb, sc), sc)
+  def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LinearRegressionModel =  {
+    mloutput = baseFit(X_mb, y_mb, sc)
+    new LinearRegressionModel(this)
+  }
     
-  def fit(df: ScriptsUtils.SparkDataType): LinearRegressionModel = 
-    new LinearRegressionModel("lr")(baseFit(df, sc), sc)
+  def fit(df: ScriptsUtils.SparkDataType): LinearRegressionModel = { 
+    mloutput = baseFit(df, sc)
+    new LinearRegressionModel(this)
+  }
   
 }
 
-class LinearRegressionModel(override val uid: String)(val mloutput: MLResults, 
val sc: SparkContext) extends Model[LinearRegressionModel] with HasIcpt
+class LinearRegressionModel(override val uid: 
String)(estimator:LinearRegression, val sc: SparkContext) extends 
Model[LinearRegressionModel] with HasIcpt
     with HasRegParam with HasTol with HasMaxOuterIter with 
BaseSystemMLRegressorModel {
   override def copy(extra: ParamMap): LinearRegressionModel = {
-    val that = new LinearRegressionModel(uid)(mloutput, sc)
+    val that = new LinearRegressionModel(uid)(estimator, sc)
     copyValues(that, extra)
   }
   
-  def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, 
String) =
-    
PredictionUtils.getGLMPredictionScript(mloutput.getBinaryBlockMatrix("beta_out"),
 isSingleNode)
+  def baseEstimator():BaseSystemMLEstimator = estimator
+  
+  def this(estimator:LinearRegression) =  {
+       this("model")(estimator, estimator.sc)
+  }
+  
+  def getPredictionScript(isSingleNode:Boolean): (Script, String) =
+    
PredictionUtils.getGLMPredictionScript(estimator.mloutput.getBinaryBlockMatrix("beta_out"),
 isSingleNode)
+  
+  def modelVariables():List[String] = List[String]("beta_out")
   
-  def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, 
mloutput, sc, "means")
+  def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, 
sc, "means")
   
-  def transform(X: MatrixBlock): MatrixBlock =  baseTransform(X, mloutput, sc, 
"means")
+  def transform(X: MatrixBlock): MatrixBlock =  baseTransform(X, sc, "means")
   
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala 
b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
index 9f3d844..f4b5afe 100644
--- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
@@ -54,15 +54,16 @@ class LogisticRegression(override val uid: String, val sc: 
SparkContext) extends
     copyValues(that, extra)
   }
   
+
   // Note: will update the y_mb as this will be called by Python mllearn
   def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LogisticRegressionModel = {
-    val ret = baseFit(X_mb, y_mb, sc)
-    new LogisticRegressionModel("log")(ret, sc)
+    mloutput = baseFit(X_mb, y_mb, sc)
+    new LogisticRegressionModel(this)
   }
   
   def fit(df: ScriptsUtils.SparkDataType): LogisticRegressionModel = {
-    val ret = baseFit(df, sc)
-    new LogisticRegressionModel("log")(ret, sc)
+    mloutput = baseFit(df, sc)
+    new LogisticRegressionModel(this)
   }
   
   
@@ -89,21 +90,26 @@ object LogisticRegressionModel {
  */
 
 class LogisticRegressionModel(override val uid: String)(
-    val mloutput: MLResults, val sc: SparkContext) 
+    estimator: LogisticRegression, val sc: SparkContext) 
     extends Model[LogisticRegressionModel] with HasIcpt
     with HasRegParam with HasTol with HasMaxOuterIter with HasMaxInnerIter 
with BaseSystemMLClassifierModel {
   override def copy(extra: ParamMap): LogisticRegressionModel = {
-    val that = new LogisticRegressionModel(uid)(mloutput, sc)
+    val that = new LogisticRegressionModel(uid)(estimator, sc)
     copyValues(that, extra)
   }
   var outputRawPredictions = true
   def setOutputRawPredictions(outRawPred:Boolean): Unit = { 
outputRawPredictions = outRawPred }
+  def this(estimator:LogisticRegression) =  {
+       this("model")(estimator, estimator.sc)
+  }
+  def getPredictionScript(isSingleNode:Boolean): (Script, String) =
+    
PredictionUtils.getGLMPredictionScript(estimator.mloutput.getBinaryBlockMatrix("B_out"),
 isSingleNode, 3)
+  
+  def baseEstimator():BaseSystemMLEstimator = estimator
+  def modelVariables():List[String] = List[String]("B_out")
   
-  def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, 
String) =
-    
PredictionUtils.getGLMPredictionScript(mloutput.getBinaryBlockMatrix("B_out"), 
isSingleNode, 3)
-   
-  def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, 
"means")
-  def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, 
mloutput, sc, "means")
+  def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means")
+  def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, 
sc, "means")
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala 
b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
index 9161a8f..b2e967b 100644
--- a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
@@ -46,13 +46,13 @@ class NaiveBayes(override val uid: String, val sc: 
SparkContext) extends Estimat
   
   // Note: will update the y_mb as this will be called by Python mllearn
   def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): NaiveBayesModel = {
-    val ret = baseFit(X_mb, y_mb, sc)
-    new NaiveBayesModel("naive")(ret, sc)
+    mloutput = baseFit(X_mb, y_mb, sc)
+    new NaiveBayesModel(this)
   }
   
   def fit(df: ScriptsUtils.SparkDataType): NaiveBayesModel = {
-    val ret = baseFit(df, sc)
-    new NaiveBayesModel("naive")(ret, sc)
+    mloutput = baseFit(df, sc)
+    new NaiveBayesModel(this)
   }
   
   def getTrainingScript(isSingleNode:Boolean):(Script, String, String)  = {
@@ -74,15 +74,20 @@ object NaiveBayesModel {
 }
 
 class NaiveBayesModel(override val uid: String)
-  (val mloutput: MLResults, val sc: SparkContext) 
+  (estimator:NaiveBayes, val sc: SparkContext) 
   extends Model[NaiveBayesModel] with HasLaplace with 
BaseSystemMLClassifierModel {
   
+  def this(estimator:NaiveBayes) =  {
+    this("model")(estimator, estimator.sc)
+  }
+  
   override def copy(extra: ParamMap): NaiveBayesModel = {
-    val that = new NaiveBayesModel(uid)(mloutput, sc)
+    val that = new NaiveBayesModel(uid)(estimator, sc)
     copyValues(that, extra)
   }
   
-  def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, 
String)  = {
+  def modelVariables():List[String] = List[String]("classPrior", 
"classConditionals")
+  def getPredictionScript(isSingleNode:Boolean): (Script, String)  = {
     val script = dml(ScriptsUtils.getDMLScript(NaiveBayesModel.scriptPath))
       .in("$X", " ")
       .in("$prior", " ")
@@ -90,8 +95,8 @@ class NaiveBayesModel(override val uid: String)
       .in("$probabilities", " ")
       .out("probs")
     
-    val classPrior = mloutput.getBinaryBlockMatrix("classPrior")
-    val classConditionals = mloutput.getBinaryBlockMatrix("classConditionals")
+    val classPrior = estimator.mloutput.getBinaryBlockMatrix("classPrior")
+    val classConditionals = 
estimator.mloutput.getBinaryBlockMatrix("classConditionals")
     val ret = if(isSingleNode) {
       script.in("prior", classPrior.getMatrixBlock, 
classPrior.getMatrixMetadata)
             .in("conditionals", classConditionals.getMatrixBlock, 
classConditionals.getMatrixMetadata)
@@ -103,7 +108,8 @@ class NaiveBayesModel(override val uid: String)
     (ret, "D")
   }
   
-  def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, 
"probs")
-  def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, 
mloutput, sc, "probs")
+  def baseEstimator():BaseSystemMLEstimator = estimator
+  def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "probs")
+  def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, 
sc, "probs")
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/SVM.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/SVM.scala 
b/src/main/scala/org/apache/sysml/api/ml/SVM.scala
index db8ce3a..d706101 100644
--- a/src/main/scala/org/apache/sysml/api/ml/SVM.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/SVM.scala
@@ -67,13 +67,13 @@ class SVM (override val uid: String, val sc: SparkContext, 
val isMultiClass:Bool
   
   // Note: will update the y_mb as this will be called by Python mllearn
   def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): SVMModel = {
-    val ret = baseFit(X_mb, y_mb, sc)
-    new SVMModel("svm")(ret, sc, isMultiClass)
+    mloutput = baseFit(X_mb, y_mb, sc)
+    new SVMModel(this, isMultiClass)
   }
   
   def fit(df: ScriptsUtils.SparkDataType): SVMModel = {
-    val ret = baseFit(df, sc)
-    new SVMModel("svm")(ret, sc, isMultiClass)
+    mloutput = baseFit(df, sc)
+    new SVMModel(this, isMultiClass)
   }
   
 }
@@ -83,20 +83,27 @@ object SVMModel {
   final val predictionScriptPathMulticlass = "scripts" + File.separator + 
"algorithms" + File.separator + "m-svm-predict.dml"
 }
 
-class SVMModel (override val uid: String)(val mloutput: MLResults, val sc: 
SparkContext, val isMultiClass:Boolean) 
+class SVMModel (override val uid: String)(estimator:SVM, val sc: SparkContext, 
val isMultiClass:Boolean) 
   extends Model[SVMModel] with BaseSystemMLClassifierModel {
   override def copy(extra: ParamMap): SVMModel = {
-    val that = new SVMModel(uid)(mloutput, sc, isMultiClass)
+    val that = new SVMModel(uid)(estimator, sc, isMultiClass)
     copyValues(that, extra)
   }
   
-  def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, 
String)  = {
+  def this(estimator:SVM, isMultiClass:Boolean) =  {
+       this("model")(estimator, estimator.sc, isMultiClass)
+  }
+  
+  def baseEstimator():BaseSystemMLEstimator = estimator
+  def modelVariables():List[String] = List[String]("w")
+  
+  def getPredictionScript(isSingleNode:Boolean): (Script, String)  = {
     val script = dml(ScriptsUtils.getDMLScript(if(isMultiClass) 
SVMModel.predictionScriptPathMulticlass else 
SVMModel.predictionScriptPathBinary))
       .in("$X", " ")
       .in("$model", " ")
       .out("scores")
     
-    val w = mloutput.getBinaryBlockMatrix("w")
+    val w = estimator.mloutput.getBinaryBlockMatrix("w")
     val wVar = if(isMultiClass) "W" else "w"
       
     val ret = if(isSingleNode) {
@@ -108,6 +115,6 @@ class SVMModel (override val uid: String)(val mloutput: 
MLResults, val sc: Spark
     (ret, "X")
   }
   
-  def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, 
"scores")
-  def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, 
mloutput, sc, "scores")
+  def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "scores")
+  def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, 
sc, "scores")
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/Utils.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/Utils.scala 
b/src/main/scala/org/apache/sysml/api/ml/Utils.scala
new file mode 100644
index 0000000..da3edf5
--- /dev/null
+++ b/src/main/scala/org/apache/sysml/api/ml/Utils.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysml.api.ml
+
+class Utils {
+  def checkIfFileExists(filePath:String):Boolean = {
+    return 
org.apache.sysml.runtime.util.MapReduceTool.existsFileOnHDFS(filePath)
+  }
+}
\ No newline at end of file

Reply via email to