Repository: systemml Updated Branches: refs/heads/master d56c05ece -> 978d4de47
http://git-wip-us.apache.org/repos/asf/systemml/blob/978d4de4/src/main/java/org/apache/sysml/parser/common/CommonSyntacticValidator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/common/CommonSyntacticValidator.java b/src/main/java/org/apache/sysml/parser/common/CommonSyntacticValidator.java index 08b4ffa..3a7a347 100644 --- a/src/main/java/org/apache/sysml/parser/common/CommonSyntacticValidator.java +++ b/src/main/java/org/apache/sysml/parser/common/CommonSyntacticValidator.java @@ -152,7 +152,9 @@ public abstract class CommonSyntacticValidator { if (!sources.containsKey(namespace)) { sources.put(namespace, filePath); } - else { + else if (!sources.get(namespace).equals(filePath)) { + // Only throw an exception if the filepath is different + // If the filepath is same, ignore the statement. This is useful for repeated definition of common dml files such as source("nn/util.dml") as util notifyErrorListeners("Namespace Conflict: '" + namespace + "' already defined as " + sources.get(namespace), ctx.start); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/978d4de4/src/main/python/systemml/mllearn/estimators.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py index 30e66d4..17e5f37 100644 --- a/src/main/python/systemml/mllearn/estimators.py +++ b/src/main/python/systemml/mllearn/estimators.py @@ -217,6 +217,76 @@ class BaseSystemMLEstimator(Estimator): def transform(self, X): return self.predict(X) + def _convertPythonXToJavaObject(self, X): + """ + Converts the input python object X to a java-side object (either MatrixBlock or Java DataFrame) + + Parameters + ---------- + X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame + """ + if isinstance(X, SUPPORTED_TYPES) and self.transferUsingDF: + pdfX = convertToPandasDF(X) + df = assemble(self.sparkSession, pdfX, pdfX.columns, self.features_col).select(self.features_col) + return df._jdf + elif isinstance(X, SUPPORTED_TYPES): + return convertToMatrixBlock(self.sc, X) + elif hasattr(X, '_jdf') and self.features_col in X.columns: + # No need to assemble as input DF is likely coming via MLPipeline + return X._jdf + elif hasattr(X, '_jdf'): + assembler = VectorAssembler(inputCols=X.columns, outputCol=self.features_col) + df = assembler.transform(X) + return df._jdf + else: + raise Exception('Unsupported input type') + + def _convertJavaOutputToPythonObject(self, X, output): + """ + Converts the a java-side object output (either MatrixBlock or Java DataFrame) to a python object (based on the type of X). + + Parameters + ---------- + X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame + output: a java-side object (either MatrixBlock or Java DataFrame) + """ + if isinstance(X, SUPPORTED_TYPES) and self.transferUsingDF: + retDF = DataFrame(output, self.sparkSession) + retPDF = retDF.sort('__INDEX').select('prediction').toPandas() + return retPDF.as_matrix().flatten() if isinstance(X, np.ndarray) else retPDF + elif isinstance(X, SUPPORTED_TYPES): + return convertToNumPyArr(self.sc, output) + elif hasattr(X, '_jdf'): + retDF = DataFrame(output, self.sparkSession) + # Return DF + return retDF.sort('__INDEX') + else: + raise Exception('Unsupported input type') + + def predict_proba(self, X): + """ + Invokes the transform_probability method on Estimator object on JVM if X and y are on of the supported data types + Return predicted class probabilities for X. + + Parameters + ---------- + X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame + """ + if hasattr(X, '_jdf'): + return self.predict(X) + elif self.transferUsingDF: + raise ValueError('The parameter transferUsingDF is not valid for the method predict_proba') + try: + if self.estimator is not None and self.model is not None: + self.estimator.copyProperties(self.model) + except AttributeError: + pass + try: + jX = self._convertPythonXToJavaObject(X) + return self._convertJavaOutputToPythonObject(X, self.model.transform_probability(jX)) + except Py4JError: + traceback.print_exc() + # Returns either a DataFrame or MatrixBlock after calling transform(X:MatrixBlock, y:MatrixBlock) on Model object on JVM def predict(self, X): """ @@ -231,40 +301,12 @@ class BaseSystemMLEstimator(Estimator): self.estimator.copyProperties(self.model) except AttributeError: pass - if isinstance(X, SUPPORTED_TYPES): - if self.transferUsingDF: - pdfX = convertToPandasDF(X) - df = assemble(self.sparkSession, pdfX, pdfX.columns, self.features_col).select(self.features_col) - retjDF = self.model.transform(df._jdf) - retDF = DataFrame(retjDF, self.sparkSession) - retPDF = retDF.sort('__INDEX').select('prediction').toPandas() - if isinstance(X, np.ndarray): - return self.decode(retPDF.as_matrix().flatten()) - else: - return self.decode(retPDF) - else: - try: - retNumPy = self.decode(convertToNumPyArr(self.sc, self.model.transform(convertToMatrixBlock(self.sc, X)))) - except Py4JError: - traceback.print_exc() - if isinstance(X, np.ndarray): - return retNumPy - else: - return retNumPy # TODO: Convert to Pandas - elif hasattr(X, '_jdf'): - if self.features_col in X.columns: - # No need to assemble as input DF is likely coming via MLPipeline - df = X - else: - assembler = VectorAssembler(inputCols=X.columns, outputCol=self.features_col) - df = assembler.transform(X) - retjDF = self.model.transform(df._jdf) - retDF = DataFrame(retjDF, self.sparkSession) - # Return DF - return retDF.sort('__INDEX') - else: - raise Exception('Unsupported input type') - + try: + jX = self._convertPythonXToJavaObject(X) + ret = self._convertJavaOutputToPythonObject(X, self.model.transform(jX)) + return self.decode(ret) if isinstance(X, SUPPORTED_TYPES) else ret + except Py4JError: + traceback.print_exc() class BaseSystemMLClassifier(BaseSystemMLEstimator): @@ -274,6 +316,10 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator): return self.le.transform(y) + 1 def decode(self, y): + if not hasattr(self, 'le'): + self.le = None + if not hasattr(self, 'labelMap'): + self.labelMap = None if self.le is not None: return self.le.inverse_transform(np.asarray(y - 1, dtype=int)) elif self.labelMap is not None: @@ -316,18 +362,17 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator): keys = np.asarray(df._c0, dtype='int') values = np.asarray(df._c1, dtype='str') self.labelMap = {} - self.le = None for i in range(len(keys)): self.labelMap[int(keys[i])] = values[i] # self.encode(classes) # Giving incorrect results - def load(self, weights=None, sep='/'): + def load(self, weights, sep='/'): """ Load a pretrained model. Parameters ---------- - weights: directory whether learned weights are stored (default: None) + weights: directory whether learned weights are stored sep: seperator to use (default: '/') """ self.weights = weights @@ -737,7 +782,7 @@ class Caffe2DML(BaseSystemMLClassifier): if ignore_weights is not None: self.estimator.setWeightsToIgnore(ignore_weights) - def set(self, debug=None, train_algo=None, test_algo=None, parallel_batches=None): + def set(self, debug=None, train_algo=None, test_algo=None, parallel_batches=None, output_activations=None): """ Set input to Caffe2DML @@ -746,13 +791,26 @@ class Caffe2DML(BaseSystemMLClassifier): debug: to add debugging DML code such as classification report, print DML script, etc (default: False) train_algo: can be minibatch, batch, allreduce_parallel_batches or allreduce (default: minibatch) test_algo: can be minibatch, batch, allreduce_parallel_batches or allreduce (default: minibatch) + parallel_batches: number of parallel batches + output_activations: (developer flag) directory to output activations of each layer as csv while prediction. To be used only in batch mode (default: None) """ if debug is not None: self.estimator.setInput("$debug", str(debug).upper()) if train_algo is not None: self.estimator.setInput("$train_algo", str(train_algo).lower()) if test_algo is not None: self.estimator.setInput("$test_algo", str(test_algo).lower()) if parallel_batches is not None: self.estimator.setInput("$parallel_batches", str(parallel_batches)) + if output_activations is not None: self.estimator.setInput("$output_activations", str(output_activations)) return self + def summary(self): + """ + Print the summary of the network + """ + import pyspark + if type(self.sparkSession) == pyspark.sql.session.SparkSession: + self.estimator.summary(self.sparkSession._jsparkSession) + else: + raise TypeError('Please use spark session of type pyspark.sql.session.SparkSession in the constructor') + def visualize(self, layerName=None, varType='weight', aggFn='mean'): """ Use this to visualize the training procedure (requires validation_percentage to be non-zero). http://git-wip-us.apache.org/repos/asf/systemml/blob/978d4de4/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala index f338fd7..25d19f6 100644 --- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala +++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala @@ -51,7 +51,6 @@ import org.apache.commons.logging.Log import org.apache.commons.logging.LogFactory import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer - /*************************************************************************************** DESIGN OF CAFFE2DML: @@ -164,6 +163,17 @@ class Caffe2DML(val sc: SparkContext, val solverParam:Caffe.SolverParameter, new Caffe2DMLModel(this) } // -------------------------------------------------------------- + // Returns true if last 2 of 4 dimensions are 1. + // The first dimension refers to number of input datapoints. + // The second dimension refers to number of classes. + def isClassification():Boolean = { + val outShape = getOutputShapeOfLastLayer + return outShape._2 == 1 && outShape._3 == 1 + } + def getOutputShapeOfLastLayer():(Int, Int, Int) = { + val out = net.getCaffeLayer(net.getLayers().last).outputShape + (out._1.toInt, out._2.toInt, out._3.toInt) + } // Used for simplifying transfer learning private val layersToIgnore:HashSet[String] = new HashSet[String]() @@ -184,7 +194,23 @@ class Caffe2DML(val sc: SparkContext, val solverParam:Caffe.SolverParameter, def getTrainAlgo():String = if(inputs.containsKey("$train_algo")) inputs.get("$train_algo") else "minibatch" def getTestAlgo():String = if(inputs.containsKey("$test_algo")) inputs.get("$test_algo") else "minibatch" - + + def summary(sparkSession:org.apache.spark.sql.SparkSession):Unit = { + val header = Seq("Name", "Type", "Output", "Weight", "Bias", "Top", "Bottom") + val entries = net.getLayers.map(l => (l, net.getCaffeLayer(l))).map(l => { + val layer = l._2 + (l._1, layer.param.getType, + "(, " + layer.outputShape._1 + ", " + layer.outputShape._2 + ", " + layer.outputShape._3 + ")", + if(layer.weightShape != null) "[" + layer.weightShape()(0) + " X " + layer.weightShape()(1) + "]" else "", + if(layer.biasShape != null) "[" + layer.biasShape()(0) + " X " + layer.biasShape()(1) + "]" else "", + layer.param.getTopList.mkString(","), + layer.param.getBottomList.mkString(",") + ) + }) + import sparkSession.implicits._ + sc.parallelize(entries).toDF(header : _*).show(net.getLayers.size) + } + // ================================================================================================ // The below method parses the provided network and solver file and generates DML script. def getTrainingScript(isSingleNode:Boolean):(Script, String, String) = { @@ -252,6 +278,7 @@ class Caffe2DML(val sc: SparkContext, val solverParam:Caffe.SolverParameter, assign(tabDMLScript, "X_group_batch", Caffe2DML.X + "[group_beg:group_end,]") assign(tabDMLScript, "y_group_batch", Caffe2DML.y + "[group_beg:group_end,]") initializeGradients("parallel_batches") + assign(tabDMLScript, "X_group_batch_size", nrow("X_group_batch")) parForBlock("j", "1", "parallel_batches") { // Get a mini-batch in this group assign(tabDMLScript, "beg", "((j-1) * " + Caffe2DML.batchSize + ") %% nrow(X_group_batch) + 1") @@ -280,6 +307,7 @@ class Caffe2DML(val sc: SparkContext, val solverParam:Caffe.SolverParameter, assign(tabDMLScript, "end", " min(beg + " + Caffe2DML.batchSize + " - 1, " + Caffe2DML.numImages + ")") assign(tabDMLScript, "X_group_batch", Caffe2DML.X + "[beg:end,]") assign(tabDMLScript, "y_group_batch", Caffe2DML.y + "[beg:end,]") + assign(tabDMLScript, "X_group_batch_size", nrow("X_group_batch")) tabDMLScript.append("local_batch_size = nrow(y_group_batch)\n") val localBatchSize = "local_batch_size" initializeGradients(localBatchSize) @@ -500,11 +528,14 @@ class Caffe2DML(val sc: SparkContext, val solverParam:Caffe.SolverParameter, } private def flattenGradients():Unit = { tabDMLScript.append("# Flatten and store gradients for this parallel execution\n") + // Note: We multiply by a weighting to allow for proper gradient averaging during the + // aggregation even with uneven batch sizes. + assign(tabDMLScript, "weighting", "nrow(Xb)/X_group_batch_size") net.getLayers.map(layer => net.getCaffeLayer(layer)).map(l => { if(l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight + "_agg[j,]", - matrix(l.dWeight, "1", multiply(nrow(l.weight), ncol(l.weight)))) + matrix(l.dWeight, "1", multiply(nrow(l.weight), ncol(l.weight))) + " * weighting") if(l.shouldUpdateWeight) assign(tabDMLScript, l.dBias + "_agg[j,]", - matrix(l.dBias, "1", multiply(nrow(l.bias), ncol(l.bias)))) + matrix(l.dBias, "1", multiply(nrow(l.bias), ncol(l.bias))) + " * weighting") }) } private def aggregateAggGradients():Unit = { @@ -581,8 +612,8 @@ class Caffe2DMLModel(val numClasses:String, val sc: SparkContext, val solver:Caf updateMeanVarianceForBatchNorm(net, false) val lossLayers = getLossLayers(net) - - assign(tabDMLScript, "Prob", matrix("0", Caffe2DML.numImages, numClasses)) + val lastLayerShape = estimator.getOutputShapeOfLastLayer + assign(tabDMLScript, "Prob", matrix("0", Caffe2DML.numImages, (lastLayerShape._1*lastLayerShape._2*lastLayerShape._3).toString)) estimator.getTestAlgo.toLowerCase match { case "minibatch" => { ceilDivide(tabDMLScript(), "num_iters", Caffe2DML.numImages, Caffe2DML.batchSize) @@ -623,7 +654,7 @@ class Caffe2DMLModel(val numClasses:String, val sc: SparkContext, val solver:Caf } } case "allreduce" => { - // This setting doesnot use the batch size for scoring and allows the parfor optimizer to select plan + // This setting doesnot use the batch size for scoring and allows the parfor optimizer to select the best plan // by minimizing the memory requirement (i.e. batch size = 1) parForBlock("i", "1", Caffe2DML.numImages) { assign(tabDMLScript, "Xb", "X_full[i,]") @@ -633,6 +664,18 @@ class Caffe2DMLModel(val numClasses:String, val sc: SparkContext, val solver:Caf } case _ => throw new DMLRuntimeException("Unsupported test algo:" + estimator.getTestAlgo) } + + if(estimator.inputs.containsKey("$output_activations")) { + if(estimator.getTestAlgo.toLowerCase.equals("batch")) { + net.getLayers.map(layer => + tabDMLScript.append(write(net.getCaffeLayer(layer).out, + estimator.inputs.get("$output_activations") + "/" + net.getCaffeLayer(layer).param.getName + "_activations.mtx", "csv") + "\n") + ) + } + else { + throw new DMLRuntimeException("Incorrect usage of output_activations. It should be only used in batch mode.") + } + } val predictionScript = dmlScript.toString() System.out.println("Time taken to generate prediction script from Caffe proto:" + ((System.nanoTime() - startPredictionTime)*1e-9) + "secs." ) @@ -655,9 +698,36 @@ class Caffe2DMLModel(val numClasses:String, val sc: SparkContext, val solver:Caf // Prediction def transform(X: MatrixBlock): MatrixBlock = { - baseTransform(X, sc, "Prob") + if(estimator.isClassification) { + Caffe2DML.LOG.debug("Prediction assuming classification") + baseTransform(X, sc, "Prob") + } + else { + Caffe2DML.LOG.debug("Prediction assuming segmentation") + val outShape = estimator.getOutputShapeOfLastLayer + baseTransform(X, sc, "Prob", outShape._1.toInt, outShape._2.toInt, outShape._3.toInt) + } } + def transform_probability(X: MatrixBlock): MatrixBlock = { + if(estimator.isClassification) { + Caffe2DML.LOG.debug("Prediction of probability assuming classification") + baseTransformProbability(X, sc, "Prob") + } + else { + Caffe2DML.LOG.debug("Prediction of probability assuming segmentation") + val outShape = estimator.getOutputShapeOfLastLayer + baseTransformProbability(X, sc, "Prob", outShape._1.toInt, outShape._2.toInt, outShape._3.toInt) + } + } def transform(df: ScriptsUtils.SparkDataType): DataFrame = { - baseTransform(df, sc, "Prob") + if(estimator.isClassification) { + Caffe2DML.LOG.debug("Prediction assuming classification") + baseTransform(df, sc, "Prob", true) + } + else { + Caffe2DML.LOG.debug("Prediction assuming segmentation") + val outShape = estimator.getOutputShapeOfLastLayer + baseTransform(df, sc, "Prob", true, outShape._1.toInt, outShape._2.toInt, outShape._3.toInt) + } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/systemml/blob/978d4de4/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala index 3fdbdb1..9518d75 100644 --- a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala +++ b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala @@ -45,6 +45,9 @@ trait CaffeLayer extends BaseDMLGenerator { var computedBottomLayerOutputShape:(String, String, String) = null def bottomLayerOutputShape:(String, String, String) = { if(computedBottomLayerOutputShape == null) { + // Note: if you get org.apache.sysml.parser.LanguageException: Map is null exception + // from org.apache.sysml.api.dl.CaffeNetwork.org$apache$sysml$api$dl$CaffeNetwork$$convertLayerParameterToCaffeLayer + // you are attempting to get traverse the network (for example: bottomLayerOutputShape) before it is created. val ret = net.getBottomLayers(param.getName).map(l => net.getCaffeLayer(l)).toList if(ret.size == 0) throw new LanguageException("Expected atleast 1 bottom layer for " + param.getName) computedBottomLayerOutputShape = ret(0).outputShape @@ -487,28 +490,51 @@ class Concat(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends class SoftmaxWithLoss(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer with IsLossLayer { // ------------------------------------------------- - override def sourceFileName = "softmax" + override def sourceFileName = if(!isSegmentationProblem()) "softmax" else "softmax2d" override def init(dmlScript:StringBuilder) = {} - override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = - invokeForward(dmlScript, List[String](out), scores) + def isSegmentationProblem():Boolean = { + try { + return outputShape._2.toInt != 1 && outputShape._3.toInt != 1 + } catch { + case _:Throwable => throw new RuntimeException("Cannot infer the output dimensions:" + outputShape) + } + } + override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = { + if(!isSegmentationProblem()) { + invokeForward(dmlScript, List[String](out), scores) + } + else { + invokeForward(dmlScript, List[String](out), scores, outputShape._1) + } + } override def backward(dmlScript:StringBuilder, outSuffix:String) = { - invoke(dmlScript, "cross_entropy_loss::", List[String]("dProbs" + outSuffix), "backward", false, out, "yb") - dmlScript.append("; ") - invoke(dmlScript, "softmax::", List[String]("dOut" + id + outSuffix), "backward", false, "dProbs", scores) - val bottomLayerIDs = net.getBottomLayers(param.getName).map(l => net.getCaffeLayer(l).id) - dmlScript.append("; ") - bottomLayerIDs.map(bottomLayerID => dmlScript.append( dX(bottomLayerID) + outSuffix + " = " + "dOut" + id + outSuffix + "; ")) - dmlScript.append("\n") + if(!isSegmentationProblem()) { + invoke(dmlScript, "cross_entropy_loss::", List[String]("dProbs" + outSuffix), "backward", false, out, "yb") + dmlScript.append("; ") + invoke(dmlScript, "softmax::", List[String]("dOut" + id + outSuffix), "backward", false, "dProbs", scores) + val bottomLayerIDs = net.getBottomLayers(param.getName).map(l => net.getCaffeLayer(l).id) + dmlScript.append("; ") + bottomLayerIDs.map(bottomLayerID => dmlScript.append( dX(bottomLayerID) + outSuffix + " = " + "dOut" + id + outSuffix + "; ")) + dmlScript.append("\n") + } + else { + throw new RuntimeException("backward for SoftmaxWithLoss is not implemented for segmentation problem") + } } override def computeLoss(dmlScript:StringBuilder, numTabs:Int) = { - val tabBuilder = new StringBuilder - for(i <- 0 until numTabs) tabBuilder.append("\t") - val tabs = tabBuilder.toString - dmlScript.append("tmp_loss = cross_entropy_loss::forward(" + commaSep(out, "yb") + ")\n") - dmlScript.append(tabs).append("loss = loss + tmp_loss\n") - dmlScript.append(tabs).append("true_yb = rowIndexMax(yb)\n") - dmlScript.append(tabs).append("predicted_yb = rowIndexMax(" + out + ")\n") - dmlScript.append(tabs).append("accuracy = mean(predicted_yb == true_yb)*100\n") + if(!isSegmentationProblem()) { + val tabBuilder = new StringBuilder + for(i <- 0 until numTabs) tabBuilder.append("\t") + val tabs = tabBuilder.toString + dmlScript.append("tmp_loss = cross_entropy_loss::forward(" + commaSep(out, "yb") + ")\n") + dmlScript.append(tabs).append("loss = loss + tmp_loss\n") + dmlScript.append(tabs).append("true_yb = rowIndexMax(yb)\n") + dmlScript.append(tabs).append("predicted_yb = rowIndexMax(" + out + ")\n") + dmlScript.append(tabs).append("accuracy = mean(predicted_yb == true_yb)*100\n") + } + else { + throw new RuntimeException("Computation of loss for SoftmaxWithLoss is not implemented for segmentation problem") + } } def scores():String = { val ret = net.getBottomLayers(param.getName).map(l => net.getCaffeLayer(l)).toList @@ -840,8 +866,15 @@ class MaxPooling(val param:LayerParameter, val id:Int, val net:CaffeNetwork) ext } class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer with HasWeight with HasBias { + def isDepthWise():Boolean = { + if(param.getConvolutionParam.hasGroup && param.getConvolutionParam.getGroup != 1 && numChannels.toInt % param.getConvolutionParam.getGroup != 0) + throw new DMLRuntimeException("The number of groups=" + param.getConvolutionParam.getGroup + " is not supported as it is not divisible by number of channels" + numChannels + ".") + param.getConvolutionParam.hasGroup && param.getConvolutionParam.getGroup != 1 + } + def depthMultiplier():String = if(isDepthWise) (numChannels.toInt / param.getConvolutionParam.getGroup).toString else throw new DMLRuntimeException("Incorrect usage of depth") + // ------------------------------------------------- - override def sourceFileName = "conv2d_builtin"; + override def sourceFileName = if(isDepthWise) "conv2d_builtin_depthwise" else "conv2d_builtin" /* * Initialize the parameters of this layer. * @@ -854,17 +887,28 @@ class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) ex * assumption of relu neurons. * - http://arxiv.org/abs/1502.01852 * - * Inputs: + * Inputs without depthwise: * - F: Number of filters. * - C: Number of input channels (dimensionality of depth). * - Hf: Filter height. * - Wf: Filter width. * + * Inputs with depthwise: + * - C: Number of input channels (dimensionality of depth). + * - M: Number of filters per input channel (i.e. depth multiplier). + * - Hf: Filter height. + * - Wf: Filter width. + * * Outputs: * - W: Weights, of shape (F, C*Hf*Wf). * - b: Biases, of shape (F, 1). */ - override def init(dmlScript:StringBuilder) = invokeInit(dmlScript, List[String](weight, bias), numKernels, numChannels, kernel_h, kernel_w) + override def init(dmlScript:StringBuilder) = { + if(isDepthWise) + invokeInit(dmlScript, List[String](weight, bias), numChannels, depthMultiplier, kernel_h, kernel_w) + else + invokeInit(dmlScript, List[String](weight, bias), numKernels, numChannels, kernel_h, kernel_w) + } /* * Computes the forward pass for a 2D spatial convolutional layer with * F filters. The input data has N examples, each represented as a 3D @@ -880,6 +924,7 @@ class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) ex * - C: Number of input channels (dimensionality of depth). * - Hin: Input height. * - Win: Input width. + * (only for depthwise) - M: Number of filters per input channel (i.e. depth multiplier). * - Hf: Filter height. * - Wf: Filter width. * - strideh: Stride over height. @@ -900,9 +945,14 @@ class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) ex * - Hout: Output height. * - Wout: Output width. */ - override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = - invokeForward(dmlScript, List[String](out, "ignoreHout_"+id, "ignoreWout_"+id), + override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = { + if(isDepthWise) + invokeForward(dmlScript, List[String](out, "ignoreHout_"+id, "ignoreWout_"+id), + X, weight, bias, numChannels, Hin, Win, depthMultiplier, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) + else + invokeForward(dmlScript, List[String](out, "ignoreHout_"+id, "ignoreWout_"+id), X, weight, bias, numChannels, Hin, Win, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) + } /* * Computes the backward pass for a 2D spatial convolutional layer * with F filters. @@ -918,6 +968,7 @@ class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) ex * - C: Number of input channels (dimensionality of depth). * - Hin: Input height. * - Win: Input width. + * (only for depthwise) - M: Number of filters per input channel (i.e. depth multiplier). * - Hf: Filter height. * - Wf: Filter width. * - strideh: Stride over height. @@ -938,10 +989,18 @@ class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) ex * - dW: Gradient wrt `W`, of shape (F, C*Hf*Wf). * - db: Gradient wrt `b`, of shape (F, 1). */ - override def backward(dmlScript:StringBuilder, outSuffix:String) = - invokeBackward(dmlScript, outSuffix, List[String]("dOut" + id, dWeight, dBias), dout, Hout, Wout, X, weight, bias, numChannels, Hin, Win, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) - // n * c_o * h_o * w_o, where h_o = (h_i + 2 * pad_h - kernel_h) / stride_h + 1 and w_o likewise. - override def outputShape = ( numKernels, Hout, Wout ) + override def backward(dmlScript:StringBuilder, outSuffix:String) = { + if(isDepthWise) + invokeBackward(dmlScript, outSuffix, List[String]("dOut" + id, dWeight, dBias), dout, Hout, Wout, X, weight, bias, numChannels, Hin, Win, depthMultiplier, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) + else + invokeBackward(dmlScript, outSuffix, List[String]("dOut" + id, dWeight, dBias), dout, Hout, Wout, X, weight, bias, numChannels, Hin, Win, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) + } + // if not depthwise, n * c_o * h_o * w_o, where h_o = (h_i + 2 * pad_h - kernel_h) / stride_h + 1 and w_o likewise. + // else (N, C*M*Hout*Wout) + override def outputShape = { + if(isDepthWise) ( (numChannels.toInt*depthMultiplier.toInt).toString, Hout, Wout ) + else ( numKernels, Hout, Wout ) + } // ------------------------------------------------- def numChannels = bottomLayerOutputShape._1 def Hin = bottomLayerOutputShape._2 @@ -950,8 +1009,16 @@ class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) ex def Wout = ConvolutionUtils.getConv2dOutputMap(bottomLayerOutputShape._3, kernel_w, stride_w, pad_w) // ------------------------------------------------- def convParam = param.getConvolutionParam - override def weightShape():Array[Int] = Array(numKernels.toInt, int_mult(numChannels, kernel_h, kernel_w).toInt) - override def biasShape():Array[Int] = Array(numKernels.toInt, 1) + // if depthwise (C, M*Hf*Wf) else (F, C*Hf*Wf) + override def weightShape():Array[Int] = { + if(isDepthWise) Array(numChannels.toInt, int_mult(depthMultiplier, kernel_h, kernel_w).toInt) + else Array(numKernels.toInt, int_mult(numChannels, kernel_h, kernel_w).toInt) + } + // if depthwise (C*M, 1) else (F, 1) + override def biasShape():Array[Int] = { + if(isDepthWise) Array(numChannels.toInt*depthMultiplier.toInt, 1) + else Array(numKernels.toInt, 1) + } // num_output (c_o): the number of filters def numKernels = convParam.getNumOutput.toString // kernel_size (or kernel_h and kernel_w): specifies height and width of each filter @@ -978,7 +1045,15 @@ class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) ex } class DeConvolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer with HasWeight with HasBias { - override def sourceFileName: String = "conv2d_transpose" + def isDepthWise():Boolean = { + if(param.getConvolutionParam.hasGroup && param.getConvolutionParam.getGroup != 1 && numChannels.toInt % param.getConvolutionParam.getGroup != 0) + throw new DMLRuntimeException("The number of groups=" + param.getConvolutionParam.getGroup + " is not supported as it is not divisible by number of channels" + numChannels + ".") + param.getConvolutionParam.hasGroup && param.getConvolutionParam.getGroup != 1 + } + def depthMultiplier():String = if(isDepthWise) (numChannels.toInt / param.getConvolutionParam.getGroup).toString else throw new DMLRuntimeException("Incorrect usage of depth") + + override def sourceFileName: String = if(isDepthWise) "conv2d_transpose_depthwise" else "conv2d_transpose" + /* * Utility function to initialize the parameters of this layer. * @@ -988,22 +1063,48 @@ class DeConvolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) * assumption of relu neurons. * - http://arxiv.org/abs/1502.01852 * - * Inputs: + * Inputs without depthwise: * - F: Number of filters. * - C: Number of input channels (dimensionality of depth). * - Hf: Filter height. * - Wf: Filter width. * + * Inputs with depthwise: + * - C: Number of input channels (dimensionality of depth). + * - M: Depth of each filter (C must be divisible by M). + * - Hf: Filter height. + * - Wf: Filter width. + * * Outputs: - * - W: Weights, of shape (F, C*Hf*Wf). + * - W: Weights, of shape (C, F*Hf*Wf). * - b: Biases, of shape (F, 1). */ - override def init(dmlScript: StringBuilder): Unit = - invokeInit(dmlScript, List[String](weight, bias), numKernels, numChannels, kernel_h, kernel_w) - - override def weightShape():Array[Int] = Array(numKernels.toInt, int_mult(numChannels, kernel_h, kernel_w).toInt) - override def biasShape():Array[Int] = Array(numKernels.toInt, 1) - + override def init(dmlScript: StringBuilder): Unit = { + if(isDepthWise) + invokeInit(dmlScript, List[String](weight, bias), numChannels, depthMultiplier, kernel_h, kernel_w) + else + invokeInit(dmlScript, List[String](weight, bias), numKernels, numChannels, kernel_h, kernel_w) + } + + private def C_DivideBy_M():Int = numChannels.toInt / depthMultiplier.toInt + + // if depthwise (C/M, M*Hf*Wf), else (C, F*Hf*Wf) + override def weightShape():Array[Int] = { + if(isDepthWise) + Array(C_DivideBy_M, int_mult(depthMultiplier, kernel_h, kernel_w).toInt) + else + Array(numChannels.toInt, int_mult(numKernels, kernel_h, kernel_w).toInt) + } + // if depthwise (C/M, 1), else (F, 1) + override def biasShape():Array[Int] = { + if(isDepthWise) + Array(C_DivideBy_M, 1) + else + Array(numKernels.toInt, 1) + } + + private def numGroups:Int = if(param.getConvolutionParam.hasGroup) param.getConvolutionParam.getGroup else 1 + /* * Computes the forward pass for a 2D spatial transpose convolutional * layer with F filters. The input data has N examples, each @@ -1011,18 +1112,19 @@ class DeConvolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) * * Inputs: * - X: Inputs, of shape (N, C*Hin*Win). - * - W: Weights, of shape (F, C*Hf*Wf). + * - W: Weights, of shape (C, F*Hf*Wf). * - b: Biases, of shape (F, 1). * - C: Number of input channels (dimensionality of depth). * - Hin: Input height. * - Win: Input width. + * (only for depthwise): - M: Depth of each filter (C must be divisible by M). * - Hf: Filter height. * - Wf: Filter width. * - strideh: Stride over height. * - stridew: Stride over width. * - padh: Padding for top and bottom sides. * - padw: Padding for left and right sides. - * - out_padh: extra padding for top side. This should + * - out_padh: extra padding for top side. This should * lie in [0, strideh-1]. * - out_padw: extra padding for right side. This should * lie in [0, stridew-1]. @@ -1032,9 +1134,14 @@ class DeConvolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) * - Hout: Output height. * - Wout: Output width. */ - override def forward(dmlScript: StringBuilder,isPrediction: Boolean): Unit = - invokeForward(dmlScript, List[String](out, "ignoreHout_"+id, "ignoreWout_"+id), + override def forward(dmlScript: StringBuilder,isPrediction: Boolean): Unit = { + if(isDepthWise) + invokeForward(dmlScript, List[String](out, "ignoreHout_"+id, "ignoreWout_"+id), + X, weight, bias, numChannels, Hin, Win, depthMultiplier, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w, "0", "0") + else + invokeForward(dmlScript, List[String](out, "ignoreHout_"+id, "ignoreWout_"+id), X, weight, bias, numChannels, Hin, Win, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w, "0", "0") + } /* * Computes the backward pass for a 2D spatial transpose @@ -1046,11 +1153,12 @@ class DeConvolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) * - Hout: Output height. * - Wout: Output width. * - X: Inputs, of shape (N, C*Hin*Win). - * - W: Weights, of shape (F, C*Hf*Wf). + * - W: Weights, of shape (C, F*Hf*Wf). * - b: Biases, of shape (F, 1). * - C: Number of input channels (dimensionality of depth). * - Hin: Input height. * - Win: Input width. + * (only for depthwise): - M: Depth of each filter (C must be divisible by M). * - Hf: Filter height. * - Wf: Filter width. * - strideh: Stride over height. @@ -1060,14 +1168,20 @@ class DeConvolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) * * Outputs: * - dX: Gradient wrt `X`, of shape (N, C*Hin*Win). - * - dW: Gradient wrt `W`, of shape (F, C*Hf*Wf). + * - dW: Gradient wrt `W`, of shape (C, F*Hf*Wf). * - db: Gradient wrt `b`, of shape (F, 1). */ - override def backward(dmlScript:StringBuilder, outSuffix:String) = - invokeBackward(dmlScript, outSuffix, List[String]("dOut" + id, dWeight, dBias), + override def backward(dmlScript:StringBuilder, outSuffix:String) = { + if(isDepthWise) + invokeBackward(dmlScript, outSuffix, List[String]("dOut" + id, dWeight, dBias), + dout, Hout, Wout, X, weight, bias, numChannels, Hin, Win, depthMultiplier, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) + else + invokeBackward(dmlScript, outSuffix, List[String]("dOut" + id, dWeight, dBias), dout, Hout, Wout, X, weight, bias, numChannels, Hin, Win, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) - // n * c_o * h_o * w_o, where h_o = (h_i + 2 * pad_h - kernel_h) / stride_h + 1 and w_o likewise. - override def outputShape = ( numChannels, Hout, Wout ) + } + // if not depthwise n * c_o * h_o * w_o, where h_o = (h_i + 2 * pad_h - kernel_h) / stride_h + 1 and w_o likewise. + // else (N, C/M*Hout*Wout) + override def outputShape = if(isDepthWise) ( C_DivideBy_M().toString, Hout, Wout ) else ( numChannels, Hout, Wout ) // ------------------------------------------------- def numChannels = bottomLayerOutputShape._1 def Hin = bottomLayerOutputShape._2 http://git-wip-us.apache.org/repos/asf/systemml/blob/978d4de4/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala b/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala index 5c2dc77..67682d5 100644 --- a/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala +++ b/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala @@ -208,8 +208,8 @@ class CaffeNetwork(netFilePath:String, val currentPhase:Phase, // Helper functions private def checkKey(m:Map[String, Any], key:String): Boolean = { - if(m == null) throw new LanguageException("Map is null") - else if(key == null) throw new LanguageException("key is null") + if(m == null) throw new LanguageException("Map is null (key=" + key + ")") + else if(key == null) throw new LanguageException("key is null (map=" + m + ")") else m.containsKey(key) } private def convertLayerParameterToCaffeLayer(param:LayerParameter):CaffeLayer = { http://git-wip-us.apache.org/repos/asf/systemml/blob/978d4de4/src/main/scala/org/apache/sysml/api/dl/Utils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/Utils.scala b/src/main/scala/org/apache/sysml/api/dl/Utils.scala index d6af22c..0c00d3c 100644 --- a/src/main/scala/org/apache/sysml/api/dl/Utils.scala +++ b/src/main/scala/org/apache/sysml/api/dl/Utils.scala @@ -111,6 +111,30 @@ object Utils { } } + class CopyCaffeDeconvFloatToSystemMLDeconvDoubleArray(data:java.util.List[java.lang.Float], F:Int, C:Int, H:Int, W:Int, arr:Array[Double]) + extends CopyFloatToDoubleArray(data, C, F*H*W, false, arr) { + override def run(): Unit = { + var i = 0 + for(f <- 0 until F) { + for(c <- 0 until C) { + for(hw <- 0 until H*W) { + arr(c*F*H*W + f*H*W + hw) = data.get(i).doubleValue() + i = i+1 + } + } + } + } + } + + def allocateDeconvolutionWeight(data:java.util.List[java.lang.Float], F:Int, C:Int, H:Int, W:Int):(MatrixBlock,CopyFloatToDoubleArray) = { + val mb = new MatrixBlock(C, F*H*W, false) + mb.allocateDenseBlock() + val arr = mb.getDenseBlock + val thread = new CopyCaffeDeconvFloatToSystemMLDeconvDoubleArray(data, F, C, H, W, arr) + thread.start + return (mb, thread) + } + def allocateMatrixBlock(data:java.util.List[java.lang.Float], rows:Int, cols:Int, transpose:Boolean):(MatrixBlock,CopyFloatToDoubleArray) = { val mb = new MatrixBlock(rows, cols, false) mb.allocateDenseBlock() @@ -141,7 +165,7 @@ object Utils { if(inputVariables.keys.size == 0) throw new DMLRuntimeException("No weights found in the file " + caffeModelFilePath) for(input <- inputVariables.keys) { - dmlScript.append("write(" + input + ", \"" + input + ".mtx\", format=\"" + format + "\");\n") + dmlScript.append("write(" + input + ", \"" + outputDirectory + "/" + input + ".mtx\", format=\"" + format + "\");\n") } if(Caffe2DML.LOG.isDebugEnabled()) Caffe2DML.LOG.debug("Executing the script:" + dmlScript.toString) @@ -161,28 +185,43 @@ object Utils { val net1 = builder.build(); val asyncThreads = new java.util.ArrayList[CopyFloatToDoubleArray]() + val v1Layers = net1.getLayersList.map(layer => layer.getName -> layer).toMap for(layer <- net1.getLayerList) { - if(layer.getBlobsCount == 0) { + val blobs = if(layer.getBlobsCount != 0) layer.getBlobsList else if(v1Layers.contains(layer.getName)) v1Layers.get(layer.getName).get.getBlobsList else null + + if(blobs == null || blobs.size == 0) { // No weight or bias Caffe2DML.LOG.debug("The layer:" + layer.getName + " has no blobs") } - else if(layer.getBlobsCount == 2) { + else if(blobs.size == 2) { // Both weight and bias val caffe2DMLLayer = net.getCaffeLayer(layer.getName) val transpose = caffe2DMLLayer.isInstanceOf[InnerProduct] // weight - val data = layer.getBlobs(0).getDataList + val data = blobs(0).getDataList val shape = caffe2DMLLayer.weightShape() if(shape == null) - throw new DMLRuntimeException("Didnot expect weights for the layer " + layer.getName) + throw new DMLRuntimeException("Didnot expect weights for the layer " + layer.getName) validateShape(shape, data, layer.getName) - val ret1 = allocateMatrixBlock(data, shape(0), shape(1), transpose) + + val ret1 = if(caffe2DMLLayer.isInstanceOf[DeConvolution]) { + // Swap dimensions: Caffe's format (F, C*Hf*Wf) to NN layer's format (C, F*Hf*Wf). + val deconvLayer = caffe2DMLLayer.asInstanceOf[DeConvolution] + val C = shape(0) + val F = deconvLayer.numKernels.toInt + val Hf = deconvLayer.kernel_h.toInt + val Wf = deconvLayer.kernel_w.toInt + allocateDeconvolutionWeight(data, F, C, Hf, Wf) + } + else { + allocateMatrixBlock(data, shape(0), shape(1), transpose) + } asyncThreads.add(ret1._2) inputVariables.put(caffe2DMLLayer.weight, ret1._1) // bias - val biasData = layer.getBlobs(1).getDataList + val biasData = blobs(1).getDataList val biasShape = caffe2DMLLayer.biasShape() if(biasShape == null) throw new DMLRuntimeException("Didnot expect bias for the layer " + layer.getName) @@ -192,15 +231,17 @@ object Utils { inputVariables.put(caffe2DMLLayer.bias, ret2._1) Caffe2DML.LOG.debug("Read weights/bias for layer:" + layer.getName) } - else if(layer.getBlobsCount == 1) { + else if(blobs.size == 1) { // Special case: convolution/deconvolution without bias // TODO: Extend nn layers to handle this situation + Generalize this to other layers, for example: InnerProduct val caffe2DMLLayer = net.getCaffeLayer(layer.getName) val convParam = if((caffe2DMLLayer.isInstanceOf[Convolution] || caffe2DMLLayer.isInstanceOf[DeConvolution]) && caffe2DMLLayer.param.hasConvolutionParam()) caffe2DMLLayer.param.getConvolutionParam else null if(convParam == null) throw new DMLRuntimeException("Layer with blob count " + layer.getBlobsCount + " is not supported for the layer " + layer.getName) + else if(convParam.hasBiasTerm && convParam.getBiasTerm) + throw new DMLRuntimeException("Layer with blob count " + layer.getBlobsCount + " and with bias term is not supported for the layer " + layer.getName) - val data = layer.getBlobs(0).getDataList + val data = blobs(0).getDataList val shape = caffe2DMLLayer.weightShape() validateShape(shape, data, layer.getName) val ret1 = allocateMatrixBlock(data, shape(0), shape(1), false) @@ -219,6 +260,10 @@ object Utils { t.join() } + for(mb <- inputVariables.values()) { + mb.recomputeNonZeros(); + } + // Return the NetParameter without return readCaffeNet(netFilePath) } @@ -253,4 +298,4 @@ class Utils { Utils.saveCaffeModelFile(sc, deployFilePath, caffeModelFilePath, outputDirectory, format) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/systemml/blob/978d4de4/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala index 918a48d..3559a40 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala @@ -153,6 +153,8 @@ trait BaseSystemMLEstimatorModel extends BaseSystemMLEstimatorOrModel { double2Double(d) } + def transform_probability(X: MatrixBlock): MatrixBlock; + def transformSchema(schema: StructType): StructType = schema // Returns the script and variable for X @@ -217,26 +219,48 @@ trait BaseSystemMLClassifier extends BaseSystemMLEstimator { trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { - def baseTransform(X: MatrixBlock, sc: SparkContext, probVar:String): MatrixBlock = { - val isSingleNode = true + def baseTransform(X: MatrixBlock, sc: SparkContext, probVar:String): MatrixBlock = baseTransform(X, sc, probVar, -1, 1, 1) + + def baseTransform(X: MatrixBlock, sc: SparkContext, probVar:String, C:Int, H: Int, W:Int): MatrixBlock = { + val Prob = baseTransformHelper(X, sc, probVar, C, H, W) + val script1 = dml("source(\"nn/util.dml\") as util; Prediction = util::predict_class(Prob, C, H, W);") + .out("Prediction").in("Prob", Prob.toMatrixBlock, Prob.getMatrixMetadata).in("C", C).in("H", H).in("W", W) + val ret = (new MLContext(sc)).execute(script1).getMatrix("Prediction").toMatrixBlock + + if(ret.getNumColumns != 1 && H == 1 && W == 1) { + throw new RuntimeException("Expected predicted label to be a column vector") + } + return ret + } + + def baseTransformHelper(X: MatrixBlock, sc: SparkContext, probVar:String, C:Int, H: Int, W:Int): Matrix = { + val isSingleNode = true val ml = new MLContext(sc) updateML(ml) val script = getPredictionScript(isSingleNode) // Uncomment for debugging // ml.setExplainLevel(ExplainLevel.RECOMPILE_RUNTIME) val modelPredict = ml.execute(script._1.in(script._2, X, new MatrixMetadata(X.getNumRows, X.getNumColumns, X.getNonZeros))) - val ret = PredictionUtils.computePredictedClassLabelsFromProbability(modelPredict, isSingleNode, sc, probVar) - .getMatrix("Prediction").toMatrixBlock - - if(ret.getNumColumns != 1) { - throw new RuntimeException("Expected predicted label to be a column vector") - } - return ret - } - - def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, + return modelPredict.getMatrix(probVar) + } + + def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar:String): MatrixBlock = { + baseTransformProbability(X, sc, probVar, -1, 1, 1) + } + + def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar:String, C:Int, H: Int, W:Int): MatrixBlock = { + return baseTransformHelper(X, sc, probVar, C, H, W).toMatrixBlock + } + + + def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, probVar:String, outputProb:Boolean=true): DataFrame = { - val isSingleNode = false + baseTransform(df, sc, probVar, outputProb, -1, 1, 1) + } + + def baseTransformHelper(df: ScriptsUtils.SparkDataType, sc: SparkContext, + probVar:String, outputProb:Boolean, C:Int, H: Int, W:Int): Matrix = { + val isSingleNode = false val ml = new MLContext(sc) updateML(ml) val mcXin = new MatrixCharacteristics() @@ -245,11 +269,19 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { val mmXin = new MatrixMetadata(mcXin) val Xin_bin = new Matrix(Xin, mmXin) val modelPredict = ml.execute(script._1.in(script._2, Xin_bin)) - val predLabelOut = PredictionUtils.computePredictedClassLabelsFromProbability(modelPredict, isSingleNode, sc, probVar) + return modelPredict.getMatrix(probVar) + } + + def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, + probVar:String, outputProb:Boolean, C:Int, H: Int, W:Int): DataFrame = { + val Prob = baseTransformHelper(df, sc, probVar, outputProb, C, H, W) + val script1 = dml("source(\"nn/util.dml\") as util; Prediction = util::predict_class(Prob, C, H, W);") + .out("Prediction").in("Prob", Prob).in("C", C).in("H", H).in("W", W) + val predLabelOut = (new MLContext(sc)).execute(script1) val predictedDF = predLabelOut.getDataFrame("Prediction").select(RDDConverterUtils.DF_ID_COLUMN, "C1").withColumnRenamed("C1", "prediction") if(outputProb) { - val prob = modelPredict.getDataFrame(probVar, true).withColumnRenamed("C1", "probability").select(RDDConverterUtils.DF_ID_COLUMN, "probability") + val prob = Prob.toDFVectorWithIDColumn().withColumnRenamed("C1", "probability").select(RDDConverterUtils.DF_ID_COLUMN, "probability") val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sparkSession, RDDConverterUtils.DF_ID_COLUMN) return PredictionUtils.joinUsingID(dataset, PredictionUtils.joinUsingID(prob, predictedDF)) } http://git-wip-us.apache.org/repos/asf/systemml/blob/978d4de4/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala index ac6c22c..b7634d7 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala @@ -92,6 +92,8 @@ class LinearRegressionModel(override val uid: String)(estimator:LinearRegression copyValues(that, extra) } + def transform_probability(X: MatrixBlock): MatrixBlock = throw new DMLRuntimeException("Unsupported method") + def baseEstimator():BaseSystemMLEstimator = estimator def this(estimator:LinearRegression) = { http://git-wip-us.apache.org/repos/asf/systemml/blob/978d4de4/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala index 1c368c1..b04acd1 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala @@ -109,6 +109,7 @@ class LogisticRegressionModel(override val uid: String)( def modelVariables():List[String] = List[String]("B_out") def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means") + def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "means") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "means") } http://git-wip-us.apache.org/repos/asf/systemml/blob/978d4de4/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala index bc4e77d..990ab52 100644 --- a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala +++ b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala @@ -110,6 +110,7 @@ class NaiveBayesModel(override val uid: String) def baseEstimator():BaseSystemMLEstimator = estimator def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "probs") + def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "probs") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "probs") } http://git-wip-us.apache.org/repos/asf/systemml/blob/978d4de4/src/main/scala/org/apache/sysml/api/ml/SVM.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/SVM.scala b/src/main/scala/org/apache/sysml/api/ml/SVM.scala index 256bd77..9107836 100644 --- a/src/main/scala/org/apache/sysml/api/ml/SVM.scala +++ b/src/main/scala/org/apache/sysml/api/ml/SVM.scala @@ -116,5 +116,6 @@ class SVMModel (override val uid: String)(estimator:SVM, val sc: SparkContext, v } def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "scores") + def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "scores") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "scores") }
