Repository: incubator-systemml Updated Branches: refs/heads/master d36a0c1b0 -> d69f3441c
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/python/systemml/mllearn/estimators.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py index deed4c2..ec225c4 100644 --- a/src/main/python/systemml/mllearn/estimators.py +++ b/src/main/python/systemml/mllearn/estimators.py @@ -276,15 +276,22 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator): def decode(self, y): if self.le is not None: return self.le.inverse_transform(np.asarray(y - 1, dtype=int)) - else: + elif self.labelMap is not None: return [ self.labelMap[int(i)] for i in y ] + else: + return y def predict(self, X): - predictions = np.asarray(super(BaseSystemMLClassifier, self).predict(X)) - try: - return np.asarray(predictions, dtype='double') - except ValueError: - return np.asarray(predictions, dtype='str') + predictions = super(BaseSystemMLClassifier, self).predict(X) + from pyspark.sql.dataframe import DataFrame as df + if type(predictions) == df: + return predictions + else: + try: + return np.asarray(predictions, dtype='double') + except ValueError: + print(type(predictions)) + return np.asarray(predictions, dtype='str') def score(self, X, y): """ @@ -300,6 +307,55 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator): return accuracy_score(y, predictions) else: return accuracy_score(np.asarray(y, dtype='str'), np.asarray(predictions, dtype='str')) + + def loadLabels(self, file_path): + createJavaObject(self.sc, 'dummy') + utilObj = self.sc._jvm.org.apache.sysml.api.ml.Utils() + if utilObj.checkIfFileExists(file_path): + df = self.sparkSession.read.csv(file_path, header=False).toPandas() + keys = np.asarray(df._c0, dtype='int') + values = np.asarray(df._c1, dtype='str') + self.labelMap = {} + self.le = None + for i in range(len(keys)): + self.labelMap[int(keys[i])] = values[i] + # self.encode(classes) # Giving incorrect results + + def load(self, weights=None, sep='/'): + """ + Load a pretrained model. + + Parameters + ---------- + weights: directory whether learned weights are stored (default: None) + sep: seperator to use (default: '/') + """ + self.weights = weights + self.model.load(self.sc._jsc, weights, sep) + self.loadLabels(weights + '/labels.txt') + + def save(self, outputDir, format='binary', sep='/'): + """ + Save a trained model. + + Parameters + ---------- + outputDir: Directory to save the model to + format: optional format (default: 'binary') + sep: seperator to use (default: '/') + """ + if self.model != None: + self.model.save(self.sc._jsc, outputDir, format, sep) + if self.le is not None: + labelMapping = dict(enumerate(list(self.le.classes_), 1)) + else: + labelMapping = self.labelMap + lStr = [ [ int(k), str(labelMapping[k]) ] for k in labelMapping ] + df = self.sparkSession.createDataFrame(lStr) + df.write.csv(outputDir + sep + 'labels.txt', mode='overwrite', header=False) + else: + raise Exception('Cannot save as you need to train the model first using fit') + return self class BaseSystemMLRegressor(BaseSystemMLEstimator): @@ -319,6 +375,34 @@ class BaseSystemMLRegressor(BaseSystemMLEstimator): y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix """ return r2_score(y, self.predict(X), multioutput='variance_weighted') + + def load(self, weights=None, sep='/'): + """ + Load a pretrained model. + + Parameters + ---------- + weights: directory whether learned weights are stored (default: None) + sep: seperator to use (default: '/') + """ + self.weights = weights + self.model.load(self.sc._jsc, weights, sep) + + def save(self, outputDir, format='binary', sep='/'): + """ + Save a trained model. + + Parameters + ---------- + outputDir: Directory to save the model to + format: optional format (default: 'binary') + sep: seperator to use (default: '/') + """ + if self.model != None: + self.model.save(outputDir, format, sep) + else: + raise Exception('Cannot save as you need to train the model first using fit') + return self class LogisticRegression(BaseSystemMLClassifier): @@ -411,11 +495,12 @@ class LogisticRegression(BaseSystemMLClassifier): self.estimator.setIcpt(icpt) self.transferUsingDF = transferUsingDF self.setOutputRawPredictionsToFalse = True + self.model = self.sc._jvm.org.apache.sysml.api.ml.LogisticRegressionModel(self.estimator) if penalty != 'l2': raise Exception('Only l2 penalty is supported') if solver != 'newton-cg': raise Exception('Only newton-cg solver supported') - + class LinearRegression(BaseSystemMLRegressor): """ @@ -481,6 +566,7 @@ class LinearRegression(BaseSystemMLRegressor): self.estimator.setIcpt(icpt) self.transferUsingDF = transferUsingDF self.setOutputRawPredictionsToFalse = False + self.model = self.sc._jvm.org.apache.sysml.api.ml.LinearRegressionModel(self.estimator) class SVM(BaseSystemMLClassifier): @@ -526,6 +612,7 @@ class SVM(BaseSystemMLClassifier): self.sc = sparkSession._sc self.uid = "svm" createJavaObject(self.sc, 'dummy') + self.is_multi_class = is_multi_class self.estimator = self.sc._jvm.org.apache.sysml.api.ml.SVM(self.uid, self.sc._jsc.sc(), is_multi_class) self.estimator.setMaxIter(max_iter) if C <= 0: @@ -537,7 +624,7 @@ class SVM(BaseSystemMLClassifier): self.estimator.setIcpt(icpt) self.transferUsingDF = transferUsingDF self.setOutputRawPredictionsToFalse = False - + self.model = self.sc._jvm.org.apache.sysml.api.ml.SVMModel(self.estimator, self.is_multi_class) class NaiveBayes(BaseSystemMLClassifier): """ @@ -583,6 +670,7 @@ class NaiveBayes(BaseSystemMLClassifier): self.estimator.setLaplace(laplace) self.transferUsingDF = transferUsingDF self.setOutputRawPredictionsToFalse = False + self.model = self.sc._jvm.org.apache.sysml.api.ml.NaiveBayesModel(self.estimator) class Caffe2DML(BaseSystemMLClassifier): """ @@ -592,8 +680,6 @@ class Caffe2DML(BaseSystemMLClassifier): -------- >>> from systemml.mllearn import Caffe2DML - >>> from pyspark.sql import SQLContext - >>> sqlCtx = SQLContext(sc) >>> from mlxtend.data import mnist_data >>> import numpy as np >>> from sklearn.utils import shuffle @@ -603,25 +689,23 @@ class Caffe2DML(BaseSystemMLClassifier): >>> import urllib >>> urllib.urlretrieve('https://raw.githubusercontent.com/niketanpansare/model_zoo/master/caffe/vision/lenet/mnist/lenet.proto', 'lenet.proto') >>> urllib.urlretrieve('https://raw.githubusercontent.com/niketanpansare/model_zoo/master/caffe/vision/lenet/mnist/lenet_solver.proto', 'lenet_solver.proto') - >>> caffe2DML = Caffe2DML(sqlCtx, 'lenet_solver.proto').set(max_iter=500) + >>> caffe2DML = Caffe2DML(spark, 'lenet_solver.proto').set(max_iter=500) >>> caffe2DML.fit(X, y) """ - def __init__(self, sqlCtx, solver, input_shape, weights=None, ignore_weights=None, transferUsingDF=False, tensorboard_log_dir=None): + def __init__(self, sparkSession, solver, input_shape, transferUsingDF=False, tensorboard_log_dir=None): """ Performs training/prediction for a given caffe network. Parameters ---------- - sqlCtx: PySpark SQLContext + sparkSession: PySpark SparkSession solver: caffe solver file path input_shape: 3-element list (number of channels, input height, input width) - weights: directory whether learned weights are stored (default: None) - ignore_weights: names of layers to not read from the weights directory (list of string, default:None) transferUsingDF: whether to pass the input dataset via PySpark DataFrame (default: False) tensorboard_log_dir: directory to store the event logs (default: None, we use a temporary directory) """ - self.sqlCtx = sqlCtx - self.sc = sqlCtx._sc + self.sparkSession = sparkSession + self.sc = sparkSession._sc createJavaObject(self.sc, 'dummy') self.uid = "Caffe2DML" self.model = None @@ -629,30 +713,30 @@ class Caffe2DML(BaseSystemMLClassifier): raise ValueError('Expected input_shape as list of 3 element') solver = self.sc._jvm.org.apache.sysml.api.dl.Utils.readCaffeSolver(solver) self.estimator = self.sc._jvm.org.apache.sysml.api.dl.Caffe2DML(self.sc._jsc.sc(), solver, str(input_shape[0]), str(input_shape[1]), str(input_shape[2])) - self.weights = weights - if weights is not None: - self.estimator.setInput("$weights", str(weights)) - self._loadLabelTxt() - if ignore_weights is not None: - self.estimator.setWeightsToIgnore(ignore_weights) self.transferUsingDF = transferUsingDF self.setOutputRawPredictionsToFalse = False self.visualize_called = False if tensorboard_log_dir is not None: self.estimator.setTensorBoardLogDir(tensorboard_log_dir) - - def _loadLabelTxt(self, format="binary", sep="/"): - if(self.weights is not None): - self.model = self.sc._jvm.org.apache.sysml.api.dl.Caffe2DMLModel(self.estimator) - df = self.sqlCtx.read.csv(self.weights + sep + 'labels.txt', header=False).toPandas() - keys = np.asarray(df._c0, dtype='int') - values = np.asarray(df._c1, dtype='str') - self.labelMap = {} - self.le = None - for i in range(len(keys)): - self.labelMap[int(keys[i])] = values[i] - # self.encode(classes) # Giving incorrect results - + + def load(self, weights=None, sep='/', ignore_weights=None): + """ + Load a pretrained model. + + Parameters + ---------- + weights: directory whether learned weights are stored (default: None) + sep: seperator to use (default: '/') + ignore_weights: names of layers to not read from the weights directory (list of string, default:None) + """ + self.weights = weights + self.estimator.setInput("$weights", str(weights)) + self.model = self.sc._jvm.org.apache.sysml.api.dl.Caffe2DMLModel(self.estimator) + self.model.load(self.sc._jsc, weights, sep) + self.loadLabels(weights + '/labels.txt') + if ignore_weights is not None: + self.estimator.setWeightsToIgnore(ignore_weights) + def set(self, num_classes=None, debug=None): """ Set input to Caffe2DML @@ -691,25 +775,4 @@ class Caffe2DML(BaseSystemMLClassifier): self.visualize_called = True return self - def save(self, outputDir, format='binary', sep='/'): - """ - Save a trained model. - - Parameters - ---------- - outputDir: Directory to save the model to - format: optional format (default: 'binary') - sep: seperator to use (default: '/') - """ - if self.model != None: - self.model.save(outputDir, format, sep) - if self.le is not None: - labelMapping = dict(enumerate(list(self.le.classes_), 1)) - else: - labelMapping = self.labelMap - lStr = [ [ int(k), str(labelMapping[k]) ] for k in labelMapping ] - df = self.sqlCtx.createDataFrame(lStr) - df.write.csv(outputDir + sep + 'labels.txt', mode='overwrite', header=False) - else: - raise Exception('Cannot save as you need to train the model first using fit') - return self + http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala index fe6b159..7fb3e17 100644 --- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala +++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala @@ -55,15 +55,35 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze /*************************************************************************************** DESIGN OF CAFFE2DML: -1. Caffe2DML is designed to fit well into the mllearn framework. Hence, the key methods that needed to be implemented are: +1. Caffe2DML is designed to fit well into the mllearn framework. Hence, the key methods that were to be implemented are: - `getTrainingScript` for the Estimator class. - `getPredictionScript` for the Model class. +These methods should be the starting point of any developer to understand the DML generated for training and prediction respectively. + 2. To simplify the DML generation in getTrainingScript and getPredictionScript method, we use DMLGenerator interface. This interface generates DML string for common operations such as loops (such as if, for, while) as well as built-in functions (read, write), etc. Also, this interface helps in "code reading" of this class :) -3. Additionally, we created mapping classes for layer, solver and learning rate that maps the corresponding Caffe abstraction to the SystemML-NN library. +3. Here is an analogy for SystemML developers to think of various moving components of Caffe2DML: +- Like Dml.g4 in the org.apache.sysml.parser.dml package, caffe.proto in the src/main/proto/caffe directory +is used to generate classes to parse the input files. + +Dml.g4 ---> antlr ---> DmlLexer.java, DmlListener.java, DmlParser.java +caffe.proto ---> protoc ---> target/generated-sources/caffe/Caffe.java + +- Just like the classes generated by Dml.g4 are used to parse input DML file, +the target/generated-sources/caffe/Caffe.java class is used to parse the input caffe network/deploy prototxt and solver files. + +- You can think of .caffemodel file as DML file with matrix values encoded in it (please see below example). +So it is possible to read .caffemodel file with the Caffe.java class. This is done in Utils.scala's readCaffeNet method. + +X = matrix("1.2 3.5 0.999 7.123", rows=2, cols=2) +... + +- Just like we convert the AST generated by antlr into our DMLProgram representation, we convert +caffe's abstraction into the below given mapping classes for layer, solver and learning rate. +These mapping classes maps the corresponding Caffe abstraction to the SystemML-NN library. This greatly simplifies adding new layers into Caffe2DML: trait CaffeLayer { // Any layer that wants to reuse SystemML-NN has to override following methods that help in generating the DML for the given layer: @@ -87,6 +107,13 @@ trait Network { def getTopLayers(layerName:String): Set[String] def getLayerID(layerName:String): Int } + +5. One of the key design restriction of Caffe2DML is that every layer is identified uniquely by its name. +This restriction simplifies the code significantly. +To shield from network files that violates this restriction, Caffe2DML performs rewrites in CaffeNetwork class (search for condition 1-5). + +6. Caffe2DML also expects the layers to be in sorted order. + ***************************************************************************************/ object Caffe2DML { @@ -129,12 +156,12 @@ class Caffe2DML(val sc: SparkContext, val solverParam:Caffe.SolverParameter, } // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): Caffe2DMLModel = { - val ret = baseFit(X_mb, y_mb, sc) - new Caffe2DMLModel(ret, Utils.numClasses(net), sc, solver, net, lrPolicy, this) + mloutput = baseFit(X_mb, y_mb, sc) + new Caffe2DMLModel(this) } def fit(df: ScriptsUtils.SparkDataType): Caffe2DMLModel = { - val ret = baseFit(df, sc) - new Caffe2DMLModel(ret, Utils.numClasses(net), sc, solver, net, lrPolicy, this) + mloutput = baseFit(df, sc) + new Caffe2DMLModel(this) } // -------------------------------------------------------------- @@ -412,23 +439,14 @@ class Caffe2DMLModel(val mloutput: MLResults, } // -------------------------------------------------------------- - def save(outputDir:String, format:String="binary", sep:String="/"):Unit = { - if(mloutput == null) throw new DMLRuntimeException("Cannot save as you need to train the model first using fit") - val dmlScript = new StringBuilder - dmlScript.append("print(\"Saving the model to " + outputDir + "...\")\n") - net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != null).map(l => dmlScript.append(write(l.weight, outputDir + sep + l.param.getName + "_weight.mtx", format))) - net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(l => dmlScript.append(write(l.bias, outputDir + sep + l.param.getName + "_bias.mtx", format))) - - val script = dml(dmlScript.toString) - net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != null).map(l => script.in(l.weight, mloutput.getBinaryBlockMatrix(l.weight))) - net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(l => script.in(l.bias, mloutput.getBinaryBlockMatrix(l.bias))) - val ml = new MLContext(sc) - ml.execute(script) - } + def modelVariables():List[String] = { + net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != null).map(_.weight) ++ + net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(_.bias) + } // ================================================================================================ // The below method parses the provided network and solver file and generates DML script. - def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String) = { + def getPredictionScript(isSingleNode:Boolean): (Script, String) = { val startPredictionTime = System.nanoTime() reset // Reset the state of DML generator for training script. @@ -496,11 +514,13 @@ class Caffe2DMLModel(val mloutput: MLResults, } // ================================================================================================ + def baseEstimator():BaseSystemMLEstimator = estimator + // Prediction def transform(X: MatrixBlock): MatrixBlock = { - baseTransform(X, mloutput, sc, "Prob") + baseTransform(X, sc, "Prob") } def transform(df: ScriptsUtils.SparkDataType): DataFrame = { - baseTransform(df, mloutput, sc, "Prob") + baseTransform(df, sc, "Prob") } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala index 0d1740e..3fdbdb1 100644 --- a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala +++ b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala @@ -88,7 +88,9 @@ trait CaffeLayer extends BaseDMLGenerator { def dWeight():String = throw new DMLRuntimeException("dWeight is not implemented in super class") def dBias():String = throw new DMLRuntimeException("dBias is not implemented in super class") def weight():String = null; + def weightShape():Array[Int]; def bias():String = null; + def biasShape():Array[Int]; def shouldUpdateWeight():Boolean = if(weight != null) true else false def shouldUpdateBias():Boolean = if(bias != null) true else false // -------------------------------------------------------------------------------------- @@ -136,13 +138,13 @@ trait IsLossLayer extends CaffeLayer { } trait HasWeight extends CaffeLayer { - override def weight = "W" + id - override def dWeight = "dW" + id + override def weight = param.getName + "_weight" + override def dWeight = param.getName + "_dWeight" } trait HasBias extends CaffeLayer { - override def bias = "b" + id - override def dBias = "db" + id + override def bias = param.getName + "_bias" + override def dBias = param.getName + "_dBias" } class Data(val param:LayerParameter, val id:Int, val net:CaffeNetwork, val numChannels:String, val height:String, val width:String) extends CaffeLayer { @@ -152,13 +154,21 @@ class Data(val param:LayerParameter, val id:Int, val net:CaffeNetwork, val numCh if(param.hasTransformParam && param.getTransformParam.hasScale) { dmlScript.append("X_full = X_full * " + param.getTransformParam.getScale + "\n") } - dmlScript.append("BATCH_SIZE = " + param.getDataParam.getBatchSize + "\n") + if(param.hasDataParam && param.getDataParam.hasBatchSize) { + dmlScript.append("BATCH_SIZE = " + param.getDataParam.getBatchSize + "\n") + } + else { + Caffe2DML.LOG.debug("Using default batch size of 64 as batch size is not set with DataParam") + dmlScript.append("BATCH_SIZE = 64\n") + } } var dataOutputShape = ("$num_channels", "$height", "$width") override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = { } override def out = "Xb" override def backward(dmlScript:StringBuilder, outSuffix:String) = { } override def outputShape = (numChannels, height, width) + override def weightShape():Array[Int] = null + override def biasShape():Array[Int] = null // ------------------------------------------------- } @@ -303,6 +313,8 @@ class BatchNorm(val param:LayerParameter, val id:Int, val net:CaffeNetwork) exte private def withSuffix(str:String):String = if(update_mean_var) str else str + "_ignore" override def weight = "ema_mean" + id + override def weightShape():Array[Int] = Array(numChannels.toInt, 1) + override def biasShape():Array[Int] = Array(numChannels.toInt, 1) override def bias = "ema_var" + id def cache_mean(): String = "cache_mean" + id def cache_var():String = "cache_mean" + id @@ -337,6 +349,8 @@ class Scale(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends // TODO: Generalize this !! def forward(dmlScript: StringBuilder, isPrediction: Boolean): Unit = assign(dmlScript, out, X) override def backward(dmlScript: StringBuilder, outSuffix:String): Unit = assignDoutToDX(dmlScript, outSuffix) + override def weightShape():Array[Int] = Array(bottomLayerOutputShape._1.toInt, 1) + override def biasShape():Array[Int] = Array(bottomLayerOutputShape._1.toInt, 1) } // ------------------------------------------------------------------ @@ -354,7 +368,8 @@ class Elementwise(val param:LayerParameter, val id:Int, val net:CaffeNetwork) ex _out } var _out:(String, String, String) = null - + override def weightShape():Array[Int] = null + override def biasShape():Array[Int] = null } class Concat(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer { @@ -466,6 +481,8 @@ class Concat(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends _out } var _out:(String, String, String) = null + override def weightShape():Array[Int] = null + override def biasShape():Array[Int] = null } class SoftmaxWithLoss(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer with IsLossLayer { @@ -506,6 +523,8 @@ class SoftmaxWithLoss(val param:LayerParameter, val id:Int, val net:CaffeNetwork else throw new LanguageException("More than 2 bottom layers is not supported") } + override def weightShape():Array[Int] = null + override def biasShape():Array[Int] = null // ------------------------------------------------- } @@ -540,9 +559,72 @@ class ReLU(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends C * - dX: Gradient wrt `X`, of same shape as `X`. */ override def backward(dmlScript:StringBuilder, outSuffix:String) = invokeBackward(dmlScript, outSuffix, List[String]("dOut" + id), dout, X) + override def weightShape():Array[Int] = null + override def biasShape():Array[Int] = null + // ------------------------------------------------- +} + +class Softmax(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer { + // ------------------------------------------------- + override def sourceFileName = "softmax" + override def init(dmlScript:StringBuilder) = { } + /* + * Computes the forward pass for a softmax classifier. The inputs + * are interpreted as unnormalized, log-probabilities for each of + * N examples, and the softmax function transforms them to normalized + * probabilities. + * + * This can be interpreted as a generalization of the sigmoid + * function to multiple classes. + * + * `probs_ij = e^scores_ij / sum(e^scores_i)` + * + * Inputs: + * - scores: Inputs, of shape (N, D). + * + * Outputs: + * - probs: Outputs, of shape (N, D). + */ + override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = invokeForward(dmlScript, List[String](out), X) + /* + * Computes the backward pass for a softmax classifier. + * + * Note that dscores_ij has multiple source branches: + * + * ``` + * dprobs_ij/dscores_ij = probs_ij * (1 - probs_ij) + * dprobs_ik/dscores_ij = -probs_ik * probs_ij, for all k != j + * + * dloss/dscores_ij = + * (dloss/dprobs_ij * dprobs_ij/dscores_ij) + * + sum_{k!=j}(dloss/dprobs_ik * dprobs_ik/dscores_ij) + * ``` + * + * Inputs: + * - dprobs: Gradient wrt `probs` from upstream, of shape (N, D). + * - scores: Inputs, of shape (N, D). + * + * Outputs: + * - dscores: Gradient wrt `scores`, of shape (N, D). + */ + override def backward(dmlScript:StringBuilder, outSuffix:String) = invokeBackward(dmlScript, outSuffix, List[String]("dOut" + id), dout, X) + override def weightShape():Array[Int] = null + override def biasShape():Array[Int] = null // ------------------------------------------------- } + +class Threshold(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer { + override def sourceFileName = null + override def init(dmlScript:StringBuilder) = { } + val threshold = if(param.getThresholdParam.hasThreshold) param.getThresholdParam.getThreshold else 0 + override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = assign(dmlScript, out, X + " > " + threshold) + override def backward(dmlScript:StringBuilder, outSuffix:String) = throw new DMLRuntimeException("Backward operation for Threshold layer is not supported.") + override def weightShape():Array[Int] = null + override def biasShape():Array[Int] = null +} + + class Dropout(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer { // ------------------------------------------------- override def sourceFileName = "dropout" @@ -591,6 +673,8 @@ class Dropout(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extend // dropout ratio def p = if(param.getDropoutParam.hasDropoutRatio()) param.getDropoutParam.getDropoutRatio.toString else "0.5" def seed = "-1" + override def weightShape():Array[Int] = null + override def biasShape():Array[Int] = null } class InnerProduct(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer with HasWeight with HasBias { @@ -656,8 +740,11 @@ class InnerProduct(val param:LayerParameter, val id:Int, val net:CaffeNetwork) e def numFeatures = int_mult(bottomLayerOutputShape._1, bottomLayerOutputShape._2, bottomLayerOutputShape._3) // n * c_o * 1 * 1 override def outputShape = ( param.getInnerProductParam.getNumOutput.toString, "1", "1" ) + override def weightShape():Array[Int] = Array(numFeatures.toInt, numNeurons.toInt) + override def biasShape():Array[Int] = Array(1, numNeurons.toInt) } + class MaxPooling(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer { // ------------------------------------------------- override def sourceFileName = "max_pool2d_builtin" @@ -748,6 +835,8 @@ class MaxPooling(val param:LayerParameter, val id:Int, val net:CaffeNetwork) ext def pad_w = if(poolingParam.hasPadW) poolingParam.getPadW.toString else if(poolingParam.hasPad) poolingParam.getPad.toString else "0" + override def weightShape():Array[Int] = null + override def biasShape():Array[Int] = null } class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer with HasWeight with HasBias { @@ -861,6 +950,8 @@ class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) ex def Wout = ConvolutionUtils.getConv2dOutputMap(bottomLayerOutputShape._3, kernel_w, stride_w, pad_w) // ------------------------------------------------- def convParam = param.getConvolutionParam + override def weightShape():Array[Int] = Array(numKernels.toInt, int_mult(numChannels, kernel_h, kernel_w).toInt) + override def biasShape():Array[Int] = Array(numKernels.toInt, 1) // num_output (c_o): the number of filters def numKernels = convParam.getNumOutput.toString // kernel_size (or kernel_h and kernel_w): specifies height and width of each filter @@ -910,6 +1001,9 @@ class DeConvolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) override def init(dmlScript: StringBuilder): Unit = invokeInit(dmlScript, List[String](weight, bias), numKernels, numChannels, kernel_h, kernel_w) + override def weightShape():Array[Int] = Array(numKernels.toInt, int_mult(numChannels, kernel_h, kernel_w).toInt) + override def biasShape():Array[Int] = Array(numKernels.toInt, 1) + /* * Computes the forward pass for a 2D spatial transpose convolutional * layer with F filters. The input data has N examples, each @@ -1017,4 +1111,4 @@ class DeConvolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) def pad_w = if(convParam.hasPadW) convParam.getPadW.toString else if(convParam.getPadCount > 0) convParam.getPad(0).toString else "0" -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala b/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala index c106cb7..5c2dc77 100644 --- a/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala +++ b/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala @@ -44,22 +44,50 @@ object CaffeNetwork { } class CaffeNetwork(netFilePath:String, val currentPhase:Phase, - val numChannels:String, val height:String, val width:String + var numChannels:String, var height:String, var width:String ) extends Network { private def isIncludedInCurrentPhase(l:LayerParameter): Boolean = { - if(l.getIncludeCount == 0) true else l.getIncludeList.filter(r => r.hasPhase() && r.getPhase != currentPhase).length == 0 + if(currentPhase == null) return true // while deployment + else if(l.getIncludeCount == 0) true + else l.getIncludeList.filter(r => r.hasPhase() && r.getPhase != currentPhase).length == 0 } private var id = 1 - + def this(deployFilePath:String) { + this(deployFilePath, null, null, null, null) + } // -------------------------------------------------------------------------------- - private var _caffeLayerParams:List[LayerParameter] = Utils.readCaffeNet(netFilePath).getLayerList.filter(l => isIncludedInCurrentPhase(l)).toList + private var _net:NetParameter = Utils.readCaffeNet(netFilePath) + private var _caffeLayerParams:List[LayerParameter] = _net.getLayerList.filter(l => isIncludedInCurrentPhase(l)).toList + // This method is used if the user doesnot provide number of channels, height and width + private def setCHW(inputShapes:java.util.List[caffe.Caffe.BlobShape]):Unit = { + if(inputShapes.size != 1) + throw new DMLRuntimeException("Expected only one input shape") + val inputShape = inputShapes.get(0) + if(inputShape.getDimCount != 4) + throw new DMLRuntimeException("Expected the input shape of dimension 4") + numChannels = inputShape.getDim(1).toString + height = inputShape.getDim(2).toString + width = inputShape.getDim(3).toString + } + if(numChannels == null && height == null && width == null) { + val inputLayer:List[LayerParameter] = _caffeLayerParams.filter(_.getType.toLowerCase.equals("input")) + if(inputLayer.size == 1) { + setCHW(inputLayer(0).getInputParam.getShapeList) + } + else if(inputLayer.size == 0) { + throw new DMLRuntimeException("Input shape (number of channels, height, width) is unknown. Hint: If you are using deprecated input/input_shape API, we recommend you use Input layer.") + } + else { + throw new DMLRuntimeException("Multiple Input layer is not supported") + } + } // -------------------------------------------------------------------------------- private var _layerNames: List[String] = _caffeLayerParams.map(l => l.getName).toList CaffeNetwork.LOG.debug("Layers in current phase:" + _layerNames) // Condition 1: assert that each name is unique - private val _duplicateLayerNames =_layerNames.diff(_layerNames.distinct) + private val _duplicateLayerNames = _layerNames.diff(_layerNames.distinct) if(_duplicateLayerNames.size != 0) throw new LanguageException("Duplicate layer names is not supported:" + _duplicateLayerNames) // Condition 2: only 1 top name, except Data layer @@ -126,12 +154,16 @@ class CaffeNetwork(netFilePath:String, val currentPhase:Phase, else l }) + // Used while reading caffemodel + val replacedLayerNames = new HashMap[String, String](); + // Condition 5: Deal with incorrect naming // Example: layer { name: foo, bottom: arbitrary, top: bar } ... Rename the layer to bar private def isIncorrectNamingLayer(l:LayerParameter): Boolean = l.getTopCount == 1 && !l.getTop(0).equalsIgnoreCase(l.getName) _caffeLayerParams = _caffeLayerParams.map(l => { if(isIncorrectNamingLayer(l)) { val builder = l.toBuilder(); + replacedLayerNames.put(l.getName, l.getTop(0)) builder.setName(l.getTop(0)) builder.build() } @@ -161,7 +193,15 @@ class CaffeNetwork(netFilePath:String, val currentPhase:Phase, private def throwException(layerName:String) = throw new LanguageException("Layer with name " + layerName + " not found") def getLayers(): List[String] = _layerNames - def getCaffeLayer(layerName:String):CaffeLayer = if(checkKey(_layers, layerName)) _layers.get(layerName).get else throwException(layerName) + def getCaffeLayer(layerName:String):CaffeLayer = { + if(checkKey(_layers, layerName)) _layers.get(layerName).get + else { + if(replacedLayerNames.contains(layerName) && checkKey(_layers, replacedLayerNames.get(layerName))) { + _layers.get(replacedLayerNames.get(layerName)).get + } + else throwException(layerName) + } + } def getBottomLayers(layerName:String): Set[String] = if(checkKey(_bottomLayers, layerName)) _bottomLayers.get(layerName).get else throwException(layerName) def getTopLayers(layerName:String): Set[String] = if(checkKey(_topLayers, layerName)) _topLayers.get(layerName).get else throwException(layerName) def getLayerID(layerName:String): Int = if(checkKey(_layerIDs, layerName)) _layerIDs.get(layerName).get else throwException(layerName) @@ -183,11 +223,14 @@ class CaffeNetwork(netFilePath:String, val currentPhase:Phase, case "softmaxwithloss" => new SoftmaxWithLoss(param, id, this) case "dropout" => new Dropout(param, id, this) case "data" => new Data(param, id, this, numChannels, height, width) + case "input" => new Data(param, id, this, numChannels, height, width) case "batchnorm" => new BatchNorm(param, id, this) case "scale" => new Scale(param, id, this) case "eltwise" => new Elementwise(param, id, this) case "concat" => new Concat(param, id, this) case "deconvolution" => new DeConvolution(param, id, this) + case "threshold" => new Threshold(param, id, this) + case "softmax" => new Softmax(param, id, this) case _ => throw new LanguageException("Layer of type " + param.getType + " is not supported") } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/dl/Utils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/Utils.scala b/src/main/scala/org/apache/sysml/api/dl/Utils.scala index 5181c9b..5c7222c 100644 --- a/src/main/scala/org/apache/sysml/api/dl/Utils.scala +++ b/src/main/scala/org/apache/sysml/api/dl/Utils.scala @@ -34,6 +34,11 @@ import java.io.InputStreamReader; import org.apache.sysml.runtime.DMLRuntimeException import java.io.StringReader import java.io.BufferedReader +import com.google.protobuf.CodedInputStream +import org.apache.sysml.runtime.matrix.data.MatrixBlock +import org.apache.sysml.api.mlcontext.MLContext +import org.apache.spark.SparkContext +import org.apache.spark.api.java.JavaSparkContext object Utils { // --------------------------------------------------------------------------------------------- @@ -80,12 +85,144 @@ object Utils { // -------------------------------------------------------------- // Caffe utility functions def readCaffeNet(netFilePath:String):NetParameter = { + // Load network val reader:InputStreamReader = getInputStreamReader(netFilePath); val builder:NetParameter.Builder = NetParameter.newBuilder(); TextFormat.merge(reader, builder); return builder.build(); } + class CopyFloatToDoubleArray(data:java.util.List[java.lang.Float], rows:Int, cols:Int, transpose:Boolean, arr:Array[Double]) extends Thread { + override def run(): Unit = { + if(transpose) { + var iter = 0 + for(i <- 0 until cols) { + for(j <- 0 until rows) { + arr(j*cols + i) = data.get(iter).doubleValue() + iter += 1 + } + } + } + else { + for(i <- 0 until data.size()) { + arr(i) = data.get(i).doubleValue() + } + } + } + } + + def allocateMatrixBlock(data:java.util.List[java.lang.Float], rows:Int, cols:Int, transpose:Boolean):(MatrixBlock,CopyFloatToDoubleArray) = { + val mb = new MatrixBlock(rows, cols, false) + mb.allocateDenseBlock() + val arr = mb.getDenseBlock + val thread = new CopyFloatToDoubleArray(data, rows, cols, transpose, arr) + thread.start + return (mb, thread) + } + def validateShape(shape:Array[Int], data:java.util.List[java.lang.Float], layerName:String): Unit = { + if(shape == null) + throw new DMLRuntimeException("Unexpected weight for layer: " + layerName) + else if(shape.length != 2) + throw new DMLRuntimeException("Expected shape to be of length 2:" + layerName) + else if(shape(0)*shape(1) != data.size()) + throw new DMLRuntimeException("Incorrect size of blob from caffemodel for the layer " + layerName + ". Expected of size " + shape(0)*shape(1) + ", but found " + data.size()) + } + + def saveCaffeModelFile(sc:JavaSparkContext, deployFilePath:String, + caffeModelFilePath:String, outputDirectory:String, format:String):Unit = { + saveCaffeModelFile(sc.sc, deployFilePath, caffeModelFilePath, outputDirectory, format) + } + + def saveCaffeModelFile(sc:SparkContext, deployFilePath:String, caffeModelFilePath:String, outputDirectory:String, format:String):Unit = { + val inputVariables = new java.util.HashMap[String, MatrixBlock]() + readCaffeNet(new CaffeNetwork(deployFilePath), deployFilePath, caffeModelFilePath, inputVariables) + val ml = new MLContext(sc) + val dmlScript = new StringBuilder + if(inputVariables.keys.size == 0) + throw new DMLRuntimeException("No weights found in the file " + caffeModelFilePath) + for(input <- inputVariables.keys) { + dmlScript.append("write(" + input + ", \"" + input + ".mtx\", format=\"" + format + "\");\n") + } + if(Caffe2DML.LOG.isDebugEnabled()) + Caffe2DML.LOG.debug("Executing the script:" + dmlScript.toString) + val script = org.apache.sysml.api.mlcontext.ScriptFactory.dml(dmlScript.toString()).in(inputVariables) + ml.execute(script) + } + + def readCaffeNet(net:CaffeNetwork, netFilePath:String, weightsFilePath:String, inputVariables:java.util.HashMap[String, MatrixBlock]):NetParameter = { + // Load network + val reader:InputStreamReader = getInputStreamReader(netFilePath); + val builder:NetParameter.Builder = NetParameter.newBuilder(); + TextFormat.merge(reader, builder); + // Load weights + val inputStream = CodedInputStream.newInstance(new FileInputStream(weightsFilePath)) + inputStream.setSizeLimit(Integer.MAX_VALUE) + builder.mergeFrom(inputStream) + val net1 = builder.build(); + + val asyncThreads = new java.util.ArrayList[CopyFloatToDoubleArray]() + for(layer <- net1.getLayerList) { + if(layer.getBlobsCount == 0) { + // No weight or bias + Caffe2DML.LOG.debug("The layer:" + layer.getName + " has no blobs") + } + else if(layer.getBlobsCount == 2) { + // Both weight and bias + val caffe2DMLLayer = net.getCaffeLayer(layer.getName) + val transpose = caffe2DMLLayer.isInstanceOf[InnerProduct] + + // weight + val data = layer.getBlobs(0).getDataList + val shape = caffe2DMLLayer.weightShape() + if(shape == null) + throw new DMLRuntimeException("Didnot expect weights for the layer " + layer.getName) + validateShape(shape, data, layer.getName) + val ret1 = allocateMatrixBlock(data, shape(0), shape(1), transpose) + asyncThreads.add(ret1._2) + inputVariables.put(caffe2DMLLayer.weight, ret1._1) + + // bias + val biasData = layer.getBlobs(1).getDataList + val biasShape = caffe2DMLLayer.biasShape() + if(biasShape == null) + throw new DMLRuntimeException("Didnot expect bias for the layer " + layer.getName) + validateShape(biasShape, biasData, layer.getName) + val ret2 = allocateMatrixBlock(biasData, biasShape(0), biasShape(1), transpose) + asyncThreads.add(ret2._2) + inputVariables.put(caffe2DMLLayer.bias, ret2._1) + Caffe2DML.LOG.debug("Read weights/bias for layer:" + layer.getName) + } + else if(layer.getBlobsCount == 1) { + // Special case: convolution/deconvolution without bias + // TODO: Extend nn layers to handle this situation + Generalize this to other layers, for example: InnerProduct + val caffe2DMLLayer = net.getCaffeLayer(layer.getName) + val convParam = if((caffe2DMLLayer.isInstanceOf[Convolution] || caffe2DMLLayer.isInstanceOf[DeConvolution]) && caffe2DMLLayer.param.hasConvolutionParam()) caffe2DMLLayer.param.getConvolutionParam else null + if(convParam == null) + throw new DMLRuntimeException("Layer with blob count " + layer.getBlobsCount + " is not supported for the layer " + layer.getName) + + val data = layer.getBlobs(0).getDataList + val shape = caffe2DMLLayer.weightShape() + validateShape(shape, data, layer.getName) + val ret1 = allocateMatrixBlock(data, shape(0), shape(1), false) + asyncThreads.add(ret1._2) + inputVariables.put(caffe2DMLLayer.weight, ret1._1) + inputVariables.put(caffe2DMLLayer.bias, new MatrixBlock(convParam.getNumOutput, 1, false)) + Caffe2DML.LOG.debug("Read only weight for layer:" + layer.getName) + } + else { + throw new DMLRuntimeException("Layer with blob count " + layer.getBlobsCount + " is not supported for the layer " + layer.getName) + } + } + + // Wait for the copy to be finished + for(t <- asyncThreads) { + t.join() + } + + // Return the NetParameter without + return readCaffeNet(netFilePath) + } + def readCaffeSolver(solverFilePath:String):SolverParameter = { val reader = getInputStreamReader(solverFilePath); val builder = SolverParameter.newBuilder(); @@ -112,4 +249,12 @@ object Utils { } } // -------------------------------------------------------------- +} + +class Utils { + def saveCaffeModelFile(sc:JavaSparkContext, deployFilePath:String, + caffeModelFilePath:String, outputDirectory:String, format:String):Unit = { + Utils.saveCaffeModelFile(sc, deployFilePath, caffeModelFilePath, outputDirectory, format) + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala index f0af799..e601a7d 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala @@ -19,6 +19,7 @@ package org.apache.sysml.api.ml +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD import java.io.File import org.apache.spark.SparkContext @@ -95,7 +96,7 @@ trait BaseSystemMLEstimatorOrModel { trait BaseSystemMLEstimator extends BaseSystemMLEstimatorOrModel { def transformSchema(schema: StructType): StructType = schema - + var mloutput:MLResults = null // Returns the script and variables for X and y def getTrainingScript(isSingleNode:Boolean):(Script, String, String) @@ -120,7 +121,37 @@ trait BaseSystemMLEstimatorModel extends BaseSystemMLEstimatorOrModel { def transformSchema(schema: StructType): StructType = schema // Returns the script and variable for X - def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String) + def getPredictionScript(isSingleNode:Boolean): (Script, String) + def baseEstimator():BaseSystemMLEstimator + def modelVariables():List[String] + // self.model.load(self.sc._jsc, weights, format, sep) + def load(sc:JavaSparkContext, outputDir:String, sep:String):Unit = { + val dmlScript = new StringBuilder + dmlScript.append("print(\"Loading the model from " + outputDir + "...\")\n") + for(varName <- modelVariables) { + dmlScript.append(varName + " = read(\"" + outputDir + sep + varName + ".mtx\")\n") + } + val script = dml(dmlScript.toString) + for(varName <- modelVariables) { + script.out(varName) + } + val ml = new MLContext(sc) + baseEstimator.mloutput = ml.execute(script) + } + def save(sc:JavaSparkContext, outputDir:String, format:String="binary", sep:String="/"):Unit = { + if(baseEstimator.mloutput == null) throw new DMLRuntimeException("Cannot save as you need to train the model first using fit") + val dmlScript = new StringBuilder + dmlScript.append("print(\"Saving the model to " + outputDir + "...\")\n") + for(varName <- modelVariables) { + dmlScript.append("write(" + varName + ", \"" + outputDir + sep + varName + ".mtx\", format=\"" + format + "\")\n") + } + val script = dml(dmlScript.toString) + for(varName <- modelVariables) { + script.in(varName, baseEstimator.mloutput.getBinaryBlockMatrix(varName)) + } + val ml = new MLContext(sc) + ml.execute(script) + } } trait BaseSystemMLClassifier extends BaseSystemMLEstimator { @@ -150,11 +181,11 @@ trait BaseSystemMLClassifier extends BaseSystemMLEstimator { trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { - def baseTransform(X: MatrixBlock, mloutput: MLResults, sc: SparkContext, probVar:String): MatrixBlock = { + def baseTransform(X: MatrixBlock, sc: SparkContext, probVar:String): MatrixBlock = { val isSingleNode = true val ml = new MLContext(sc) updateML(ml) - val script = getPredictionScript(mloutput, isSingleNode) + val script = getPredictionScript(isSingleNode) // Uncomment for debugging // ml.setExplainLevel(ExplainLevel.RECOMPILE_RUNTIME) val modelPredict = ml.execute(script._1.in(script._2, X, new MatrixMetadata(X.getNumRows, X.getNumColumns, X.getNonZeros))) @@ -167,14 +198,14 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { return ret } - def baseTransform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, sc: SparkContext, + def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, probVar:String, outputProb:Boolean=true): DataFrame = { val isSingleNode = false val ml = new MLContext(sc) updateML(ml) val mcXin = new MatrixCharacteristics() val Xin = RDDConverterUtils.dataFrameToBinaryBlock(df.rdd.sparkContext, df.asInstanceOf[DataFrame].select("features"), mcXin, false, true) - val script = getPredictionScript(mloutput, isSingleNode) + val script = getPredictionScript(isSingleNode) val Xin_bin = new BinaryBlockMatrix(Xin, mcXin) val modelPredict = ml.execute(script._1.in(script._2, Xin_bin)) val predLabelOut = PredictionUtils.computePredictedClassLabelsFromProbability(modelPredict, isSingleNode, sc, probVar) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala index 5dd23e0..9e2a34a 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala @@ -60,11 +60,11 @@ trait BaseSystemMLRegressor extends BaseSystemMLEstimator { trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel { - def baseTransform(X: MatrixBlock, mloutput: MLResults, sc: SparkContext, predictionVar:String): MatrixBlock = { + def baseTransform(X: MatrixBlock, sc: SparkContext, predictionVar:String): MatrixBlock = { val isSingleNode = true val ml = new MLContext(sc) updateML(ml) - val script = getPredictionScript(mloutput, isSingleNode) + val script = getPredictionScript(isSingleNode) val modelPredict = ml.execute(script._1.in(script._2, X)) val ret = modelPredict.getBinaryBlockMatrix(predictionVar).getMatrixBlock @@ -74,13 +74,13 @@ trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel { return ret } - def baseTransform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, sc: SparkContext, predictionVar:String): DataFrame = { + def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, predictionVar:String): DataFrame = { val isSingleNode = false val ml = new MLContext(sc) updateML(ml) val mcXin = new MatrixCharacteristics() val Xin = RDDConverterUtils.dataFrameToBinaryBlock(df.rdd.sparkContext, df.asInstanceOf[DataFrame], mcXin, false, true) - val script = getPredictionScript(mloutput, isSingleNode) + val script = getPredictionScript(isSingleNode) val Xin_bin = new BinaryBlockMatrix(Xin, mcXin) val modelPredict = ml.execute(script._1.in(script._2, Xin_bin)) val predictedDF = modelPredict.getDataFrame(predictionVar).select(RDDConverterUtils.DF_ID_COLUMN, "C1").withColumnRenamed("C1", "prediction") http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala index 76bc0a3..463d81a 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala @@ -48,6 +48,7 @@ class LinearRegression(override val uid: String, val sc: SparkContext, val solve def setRegParam(value: Double) = set(regParam, value) def setTol(value: Double) = set(tol, value) + override def copy(extra: ParamMap): Estimator[LinearRegressionModel] = { val that = new LinearRegression(uid, sc, solver) copyValues(that, extra) @@ -72,26 +73,38 @@ class LinearRegression(override val uid: String, val sc: SparkContext, val solve (script, "X", "y") } - def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LinearRegressionModel = - new LinearRegressionModel("lr")(baseFit(X_mb, y_mb, sc), sc) + def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LinearRegressionModel = { + mloutput = baseFit(X_mb, y_mb, sc) + new LinearRegressionModel(this) + } - def fit(df: ScriptsUtils.SparkDataType): LinearRegressionModel = - new LinearRegressionModel("lr")(baseFit(df, sc), sc) + def fit(df: ScriptsUtils.SparkDataType): LinearRegressionModel = { + mloutput = baseFit(df, sc) + new LinearRegressionModel(this) + } } -class LinearRegressionModel(override val uid: String)(val mloutput: MLResults, val sc: SparkContext) extends Model[LinearRegressionModel] with HasIcpt +class LinearRegressionModel(override val uid: String)(estimator:LinearRegression, val sc: SparkContext) extends Model[LinearRegressionModel] with HasIcpt with HasRegParam with HasTol with HasMaxOuterIter with BaseSystemMLRegressorModel { override def copy(extra: ParamMap): LinearRegressionModel = { - val that = new LinearRegressionModel(uid)(mloutput, sc) + val that = new LinearRegressionModel(uid)(estimator, sc) copyValues(that, extra) } - def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String) = - PredictionUtils.getGLMPredictionScript(mloutput.getBinaryBlockMatrix("beta_out"), isSingleNode) + def baseEstimator():BaseSystemMLEstimator = estimator + + def this(estimator:LinearRegression) = { + this("model")(estimator, estimator.sc) + } + + def getPredictionScript(isSingleNode:Boolean): (Script, String) = + PredictionUtils.getGLMPredictionScript(estimator.mloutput.getBinaryBlockMatrix("beta_out"), isSingleNode) + + def modelVariables():List[String] = List[String]("beta_out") - def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, mloutput, sc, "means") + def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "means") - def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, "means") + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means") } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala index 9f3d844..f4b5afe 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala @@ -54,15 +54,16 @@ class LogisticRegression(override val uid: String, val sc: SparkContext) extends copyValues(that, extra) } + // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LogisticRegressionModel = { - val ret = baseFit(X_mb, y_mb, sc) - new LogisticRegressionModel("log")(ret, sc) + mloutput = baseFit(X_mb, y_mb, sc) + new LogisticRegressionModel(this) } def fit(df: ScriptsUtils.SparkDataType): LogisticRegressionModel = { - val ret = baseFit(df, sc) - new LogisticRegressionModel("log")(ret, sc) + mloutput = baseFit(df, sc) + new LogisticRegressionModel(this) } @@ -89,21 +90,26 @@ object LogisticRegressionModel { */ class LogisticRegressionModel(override val uid: String)( - val mloutput: MLResults, val sc: SparkContext) + estimator: LogisticRegression, val sc: SparkContext) extends Model[LogisticRegressionModel] with HasIcpt with HasRegParam with HasTol with HasMaxOuterIter with HasMaxInnerIter with BaseSystemMLClassifierModel { override def copy(extra: ParamMap): LogisticRegressionModel = { - val that = new LogisticRegressionModel(uid)(mloutput, sc) + val that = new LogisticRegressionModel(uid)(estimator, sc) copyValues(that, extra) } var outputRawPredictions = true def setOutputRawPredictions(outRawPred:Boolean): Unit = { outputRawPredictions = outRawPred } + def this(estimator:LogisticRegression) = { + this("model")(estimator, estimator.sc) + } + def getPredictionScript(isSingleNode:Boolean): (Script, String) = + PredictionUtils.getGLMPredictionScript(estimator.mloutput.getBinaryBlockMatrix("B_out"), isSingleNode, 3) + + def baseEstimator():BaseSystemMLEstimator = estimator + def modelVariables():List[String] = List[String]("B_out") - def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String) = - PredictionUtils.getGLMPredictionScript(mloutput.getBinaryBlockMatrix("B_out"), isSingleNode, 3) - - def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, "means") - def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, mloutput, sc, "means") + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means") + def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "means") } /** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala index 9161a8f..b2e967b 100644 --- a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala +++ b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala @@ -46,13 +46,13 @@ class NaiveBayes(override val uid: String, val sc: SparkContext) extends Estimat // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): NaiveBayesModel = { - val ret = baseFit(X_mb, y_mb, sc) - new NaiveBayesModel("naive")(ret, sc) + mloutput = baseFit(X_mb, y_mb, sc) + new NaiveBayesModel(this) } def fit(df: ScriptsUtils.SparkDataType): NaiveBayesModel = { - val ret = baseFit(df, sc) - new NaiveBayesModel("naive")(ret, sc) + mloutput = baseFit(df, sc) + new NaiveBayesModel(this) } def getTrainingScript(isSingleNode:Boolean):(Script, String, String) = { @@ -74,15 +74,20 @@ object NaiveBayesModel { } class NaiveBayesModel(override val uid: String) - (val mloutput: MLResults, val sc: SparkContext) + (estimator:NaiveBayes, val sc: SparkContext) extends Model[NaiveBayesModel] with HasLaplace with BaseSystemMLClassifierModel { + def this(estimator:NaiveBayes) = { + this("model")(estimator, estimator.sc) + } + override def copy(extra: ParamMap): NaiveBayesModel = { - val that = new NaiveBayesModel(uid)(mloutput, sc) + val that = new NaiveBayesModel(uid)(estimator, sc) copyValues(that, extra) } - def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String) = { + def modelVariables():List[String] = List[String]("classPrior", "classConditionals") + def getPredictionScript(isSingleNode:Boolean): (Script, String) = { val script = dml(ScriptsUtils.getDMLScript(NaiveBayesModel.scriptPath)) .in("$X", " ") .in("$prior", " ") @@ -90,8 +95,8 @@ class NaiveBayesModel(override val uid: String) .in("$probabilities", " ") .out("probs") - val classPrior = mloutput.getBinaryBlockMatrix("classPrior") - val classConditionals = mloutput.getBinaryBlockMatrix("classConditionals") + val classPrior = estimator.mloutput.getBinaryBlockMatrix("classPrior") + val classConditionals = estimator.mloutput.getBinaryBlockMatrix("classConditionals") val ret = if(isSingleNode) { script.in("prior", classPrior.getMatrixBlock, classPrior.getMatrixMetadata) .in("conditionals", classConditionals.getMatrixBlock, classConditionals.getMatrixMetadata) @@ -103,7 +108,8 @@ class NaiveBayesModel(override val uid: String) (ret, "D") } - def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, "probs") - def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, mloutput, sc, "probs") + def baseEstimator():BaseSystemMLEstimator = estimator + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "probs") + def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "probs") } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/SVM.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/SVM.scala b/src/main/scala/org/apache/sysml/api/ml/SVM.scala index db8ce3a..d706101 100644 --- a/src/main/scala/org/apache/sysml/api/ml/SVM.scala +++ b/src/main/scala/org/apache/sysml/api/ml/SVM.scala @@ -67,13 +67,13 @@ class SVM (override val uid: String, val sc: SparkContext, val isMultiClass:Bool // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): SVMModel = { - val ret = baseFit(X_mb, y_mb, sc) - new SVMModel("svm")(ret, sc, isMultiClass) + mloutput = baseFit(X_mb, y_mb, sc) + new SVMModel(this, isMultiClass) } def fit(df: ScriptsUtils.SparkDataType): SVMModel = { - val ret = baseFit(df, sc) - new SVMModel("svm")(ret, sc, isMultiClass) + mloutput = baseFit(df, sc) + new SVMModel(this, isMultiClass) } } @@ -83,20 +83,27 @@ object SVMModel { final val predictionScriptPathMulticlass = "scripts" + File.separator + "algorithms" + File.separator + "m-svm-predict.dml" } -class SVMModel (override val uid: String)(val mloutput: MLResults, val sc: SparkContext, val isMultiClass:Boolean) +class SVMModel (override val uid: String)(estimator:SVM, val sc: SparkContext, val isMultiClass:Boolean) extends Model[SVMModel] with BaseSystemMLClassifierModel { override def copy(extra: ParamMap): SVMModel = { - val that = new SVMModel(uid)(mloutput, sc, isMultiClass) + val that = new SVMModel(uid)(estimator, sc, isMultiClass) copyValues(that, extra) } - def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String) = { + def this(estimator:SVM, isMultiClass:Boolean) = { + this("model")(estimator, estimator.sc, isMultiClass) + } + + def baseEstimator():BaseSystemMLEstimator = estimator + def modelVariables():List[String] = List[String]("w") + + def getPredictionScript(isSingleNode:Boolean): (Script, String) = { val script = dml(ScriptsUtils.getDMLScript(if(isMultiClass) SVMModel.predictionScriptPathMulticlass else SVMModel.predictionScriptPathBinary)) .in("$X", " ") .in("$model", " ") .out("scores") - val w = mloutput.getBinaryBlockMatrix("w") + val w = estimator.mloutput.getBinaryBlockMatrix("w") val wVar = if(isMultiClass) "W" else "w" val ret = if(isSingleNode) { @@ -108,6 +115,6 @@ class SVMModel (override val uid: String)(val mloutput: MLResults, val sc: Spark (ret, "X") } - def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, "scores") - def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, mloutput, sc, "scores") + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "scores") + def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "scores") } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d69f3441/src/main/scala/org/apache/sysml/api/ml/Utils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/Utils.scala b/src/main/scala/org/apache/sysml/api/ml/Utils.scala new file mode 100644 index 0000000..da3edf5 --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/ml/Utils.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysml.api.ml + +class Utils { + def checkIfFileExists(filePath:String):Boolean = { + return org.apache.sysml.runtime.util.MapReduceTool.existsFileOnHDFS(filePath) + } +} \ No newline at end of file
