Repository: systemml Updated Branches: refs/heads/master 68b93c75f -> cf31ed2ab
[SYSTEMML-540] Improved the script generated by Caffe2DML in allreduce setting - Allowed users to pass parfor parameters for generated allreduce scripts. - Incorporated initial += support. - Cleaned up functions that are not used. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/cf31ed2a Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/cf31ed2a Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/cf31ed2a Branch: refs/heads/master Commit: cf31ed2ab0624d3f35eb72a5abc50d621c6029f9 Parents: 68b93c7 Author: Niketan Pansare <[email protected]> Authored: Mon Jan 29 12:03:06 2018 -0800 Committer: Niketan Pansare <[email protected]> Committed: Mon Jan 29 12:03:06 2018 -0800 ---------------------------------------------------------------------- src/main/python/systemml/mllearn/estimators.py | 10 +- .../org/apache/sysml/api/dl/Caffe2DML.scala | 658 +++++++++---------- .../org/apache/sysml/api/dl/CaffeLayer.scala | 13 +- .../org/apache/sysml/api/dl/DMLGenerator.scala | 50 +- .../scala/org/apache/sysml/api/dl/Utils.scala | 16 + 5 files changed, 366 insertions(+), 381 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/cf31ed2a/src/main/python/systemml/mllearn/estimators.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py index 72a6f55..bbf96c6 100644 --- a/src/main/python/systemml/mllearn/estimators.py +++ b/src/main/python/systemml/mllearn/estimators.py @@ -844,7 +844,7 @@ class Caffe2DML(BaseSystemMLClassifier): if ignore_weights is not None: self.estimator.setWeightsToIgnore(ignore_weights) - def set(self, debug=None, train_algo=None, test_algo=None, parallel_batches=None, output_activations=None, perform_one_hot_encoding=None): + def set(self, debug=None, train_algo=None, test_algo=None, parallel_batches=None, output_activations=None, perform_one_hot_encoding=None, parfor_parameters=None): """ Set input to Caffe2DML @@ -856,6 +856,7 @@ class Caffe2DML(BaseSystemMLClassifier): parallel_batches: number of parallel batches output_activations: (developer flag) directory to output activations of each layer as csv while prediction. To be used only in batch mode (default: None) perform_one_hot_encoding: should perform one-hot encoding in DML using table function (default: False) + parfor_parameters: dictionary for parfor parameters when using allreduce-style algorithms (default: "") """ if debug is not None: self.estimator.setInput("$debug", str(debug).upper()) if train_algo is not None: self.estimator.setInput("$train_algo", str(train_algo).lower()) @@ -863,6 +864,13 @@ class Caffe2DML(BaseSystemMLClassifier): if parallel_batches is not None: self.estimator.setInput("$parallel_batches", str(parallel_batches)) if output_activations is not None: self.estimator.setInput("$output_activations", str(output_activations)) if perform_one_hot_encoding is not None: self.estimator.setInput("$perform_one_hot_encoding", str(perform_one_hot_encoding).lower()) + if parfor_parameters is not None: + if isinstance(parfor_parameters, dict): + # Convert dictionary to comma-separated list + parfor_parameters = ''.join([ ', ' + str(k) + '=' + str(v) for k, v in parfor_parameters.items()]) if len(parfor_parameters) > 0 else '' + self.estimator.setInput("$parfor_parameters", parfor_parameters) + else: + raise TypeError("parfor_parameters should be a dictionary") return self def summary(self): http://git-wip-us.apache.org/repos/asf/systemml/blob/cf31ed2a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala index 9f75008..da72403 100644 --- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala +++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala @@ -51,7 +51,6 @@ import org.apache.commons.logging.Log import org.apache.commons.logging.LogFactory import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer import org.apache.sysml.hops.OptimizerUtils -import java.lang.Double /*************************************************************************************** DESIGN OF CAFFE2DML: @@ -119,6 +118,7 @@ To shield from network files that violates this restriction, Caffe2DML performs object Caffe2DML { val LOG = LogFactory.getLog(classOf[Caffe2DML].getName()) // ------------------------------------------------------------------------ + val USE_PLUS_EQ = true def layerDir = "nn/layers/" def optimDir = "nn/optim/" @@ -152,6 +152,13 @@ object Caffe2DML { } def throwUsageError(): Unit = throw new RuntimeException("Incorrect usage: train_script OUTPUT_DML_FILE SOLVER_FILE INPUT_CHANNELS INPUT_HEIGHT INPUT_WIDTH"); + + val rand = new Random + // Supported Algorithms: + val MINIBATCH_ALGORITHM = "minibatch" + val BATCH_ALGORITHM = "batch" + val ALLREDUCE_ALGORITHM = "allreduce" + val ALLREDUCE_PARALLEL_BATCHES_ALGORITHM = "allreduce_parallel_batches" } class Caffe2DML(val sc: SparkContext, @@ -208,76 +215,6 @@ class Caffe2DML(val sc: SparkContext, mloutput = baseFit(df, sc) new Caffe2DMLModel(this) } - /** - * Returns maximum dimensions of convolution and max pooling layer for either DIRECT_CONV2D or IM2COL - */ - def getMaxDimensionOfConvLayers(approach:String, batchSize:Int):Int = { - val convOrPoolLayers = net.getLayers.map(l => net.getCaffeLayer(l)).filter(l => l.isInstanceOf[Convolution] || l.isInstanceOf[MaxPooling]) - if(convOrPoolLayers.length == 0) { - return -1 - } - else if(approach.equalsIgnoreCase("DIRECT_CONV2D") || approach.equalsIgnoreCase("IM2COL")) { - convOrPoolLayers - .map(l => { - if(l.isInstanceOf[Convolution]) { - val convLayer = l.asInstanceOf[Convolution] - val CHW = convLayer.numChannels.toInt*convLayer.Hin.toInt*convLayer.Win.toInt - val KPQ = convLayer.numKernels.toInt*convLayer.Hout.toInt*convLayer.Wout.toInt - val inputOutputMaxCol = Math.max(CHW, KPQ) - if(approach.equalsIgnoreCase("DIRECT_CONV2D")) - inputOutputMaxCol - else { - val CRS = convLayer.numChannels.toInt*convLayer.kernel_h.toInt*convLayer.kernel_w.toInt - val NPQ = batchSize*convLayer.Hout.toInt*convLayer.Wout.toInt - return Math.max(Math.max(inputOutputMaxCol, CRS), NPQ) - } - } - else if(l.isInstanceOf[MaxPooling]) { - val maxpoolLayer = l.asInstanceOf[MaxPooling] - val CHW = maxpoolLayer.numChannels.toInt*maxpoolLayer.Hin.toInt*maxpoolLayer.Win.toInt - val CPQ = maxpoolLayer.numChannels.toInt*maxpoolLayer.Hout.toInt*maxpoolLayer.Wout.toInt - Math.max(CHW, CPQ) - } - else { - throw new RuntimeException("Unexpected error: Incorrect layer type for " + l.param.getName) - } - }).max - } - else { - throw new RuntimeException("Unsupported approach:" + approach) - } - } - /** - * Returns maximum size of matrix blocks for either DIRECT_CONV2D or IM2COL - */ - def getMaxMatrixBlockSize(approach:String, batchSize:Int):Long = { - if(approach.equalsIgnoreCase("DIRECT_CONV2D") || approach.equalsIgnoreCase("IM2COL")) { - net.getLayers - .map(l => net.getCaffeLayer(l)) - .map(l => { - if(l.isInstanceOf[Convolution]) { - val convLayer = l.asInstanceOf[Convolution] - val CHW = convLayer.numChannels.toLong*convLayer.Hin.toLong*convLayer.Win.toLong - val KPQ = convLayer.numKernels.toLong*convLayer.Hout.toLong*convLayer.Wout.toLong - val inputOutputMaxCol = Math.max(CHW, KPQ) - if(approach.equalsIgnoreCase("DIRECT_CONV2D")) - batchSize*inputOutputMaxCol - else { - val CRS = convLayer.numChannels.toLong*convLayer.kernel_h.toLong*convLayer.kernel_w.toLong - val NPQ = batchSize*convLayer.Hout.toLong*convLayer.Wout.toLong - return Math.max(Math.max(batchSize*inputOutputMaxCol, batchSize*CRS), batchSize*NPQ) - } - } - else { - val outputShape = l.outputShape - batchSize*outputShape._1.toLong*outputShape._2.toLong*outputShape._3.toLong - } - }).max - } - else { - throw new RuntimeException("Unsupported approach:" + approach) - } - } // -------------------------------------------------------------- // Returns true if last 2 of 4 dimensions are 1. // The first dimension refers to number of input datapoints. @@ -303,26 +240,13 @@ class Caffe2DML(val sc: SparkContext, customAssert(solverParam.getMaxIter > 0, "Please set max_iter to a positive value") customAssert(net.getLayers.filter(net.getCaffeLayer(_).isInstanceOf[IsLossLayer]).length == 1, "Expected exactly one loss layer") - // TODO: throw error or warning if user tries to set solver_mode == GPU instead of using setGPU method - - def containsParfor():Boolean = getTrainAlgo.toLowerCase.startsWith("allreduce") || getTestAlgo.toLowerCase.startsWith("allreduce") - def getTrainAlgo(): String = if (inputs.containsKey("$train_algo")) inputs.get("$train_algo") else "minibatch" - def getTestAlgo(): String = if (inputs.containsKey("$test_algo")) inputs.get("$test_algo") else "minibatch" + def trainAlgoContainsParfor():Boolean = getTrainAlgo.toLowerCase.startsWith("allreduce") + def testAlgoContainsParfor():Boolean = getTestAlgo.toLowerCase.startsWith("allreduce") + def containsParfor():Boolean = trainAlgoContainsParfor || testAlgoContainsParfor + def getTrainAlgo(): String = if (inputs.containsKey("$train_algo")) inputs.get("$train_algo") else Caffe2DML.MINIBATCH_ALGORITHM + def getTestAlgo(): String = if (inputs.containsKey("$test_algo")) inputs.get("$test_algo") else Caffe2DML.MINIBATCH_ALGORITHM - private def getMemInBytes(l:CaffeLayer, batchSize:Int, isTraining:Boolean):Long = { - val numLayerInput = if(!l.isInstanceOf[Data]) l.bottomLayerOutputShape._1.toLong * l.bottomLayerOutputShape._2.toLong * l.bottomLayerOutputShape._3.toLong * batchSize else 0 - val numLayerOutput = l.outputShape._1.toLong * l.outputShape._2.toLong * l.outputShape._3.toLong * batchSize - val numLayerError = numLayerOutput - val numLayerWeights = if(l.weightShape != null) { - val nWt = l.weightShape()(0).toLong * l.weightShape()(1).toLong - if(l.extraWeightShape != null) l.extraWeightShape()(0).toLong * l.extraWeightShape()(1).toLong + nWt - else nWt - } else 0 - val numLayerBias = if(l.biasShape != null)l.biasShape()(0).toLong * l.biasShape()(1).toLong else 0 - val numLayerGradients = (numLayerWeights + numLayerBias) * batchSize - if(isTraining) (numLayerInput + numLayerOutput + numLayerError + numLayerWeights + numLayerBias + numLayerGradients)*Double.BYTES - else (numLayerInput + numLayerOutput + numLayerWeights + numLayerBias)*Double.BYTES - } + // Prints the summary of network def summary(sparkSession: org.apache.spark.sql.SparkSession): Unit = { val layers = net.getLayers .map(l => (l, net.getCaffeLayer(l))) val numDataLayers = layers.filter(l => l._2.isInstanceOf[Data]).length @@ -349,7 +273,7 @@ class Caffe2DML(val sc: SparkContext, if (layer.biasShape != null) "[" + layer.biasShape()(0) + " X " + layer.biasShape()(1) + "]" else "", layer.param.getTopList.mkString(","), layer.param.getBottomList.mkString(","), - OptimizerUtils.toMB(getMemInBytes(l._2, batchSize, true)) + "/" + OptimizerUtils.toMB(getMemInBytes(l._2, batchSize, false)) + OptimizerUtils.toMB(Utils.getMemInBytes(l._2, batchSize, true)) + "/" + OptimizerUtils.toMB(Utils.getMemInBytes(l._2, batchSize, false)) ) }) import sparkSession.implicits._ @@ -371,6 +295,9 @@ class Caffe2DML(val sc: SparkContext, net.getLayers.map(layer => {net.getCaffeLayer(layer).debugLayer = isDebug}) net.getLayers.map(layer => {net.getCaffeLayer(layer).caffe2dmlObj = this}) } + + // Comma is included + def getParforParameters():String = if (inputs.containsKey("$parfor_parameters")) inputs.get("$parfor_parameters") else "" // ================================================================================================ // The below method parses the provided network and solver file and generates DML script. @@ -394,30 +321,139 @@ class Caffe2DML(val sc: SparkContext, // Initializes Caffe2DML.X, Caffe2DML.y, Caffe2DML.XVal, Caffe2DML.yVal and Caffe2DML.numImages val shouldValidate = solverParam.getTestInterval > 0 && solverParam.getTestIterCount > 0 && solverParam.getTestIter(0) > 0 trainTestSplit(if (shouldValidate) solverParam.getTestIter(0) else 0) - - // Set iteration-related variables such as num_iters_per_epoch, lr, etc. - ceilDivide(tabDMLScript, "num_iters_per_epoch", Caffe2DML.numImages, Caffe2DML.batchSize) - assign(tabDMLScript, "lr", solverParam.getBaseLr.toString) - assign(tabDMLScript, "max_iter", ifdef("$max_iter", solverParam.getMaxIter.toString)) - assign(tabDMLScript, "e", "0") - + assign(tabDMLScript, "lr", solverParam.getBaseLr.toString) val lossLayers = getLossLayers(net) + ceilDivide(tabDMLScript, "num_batches_per_epoch", Caffe2DML.numImages, Caffe2DML.batchSize) + if(solverParam.getDisplay > 0 && !getTrainAlgo.toLowerCase.equals(Caffe2DML.MINIBATCH_ALGORITHM)) { + tabDMLScript.append(print(dmlConcat(asDMLString("Iterations (for training loss/accuracy) refers to the number of batches processed where batch size="), Caffe2DML.batchSize))) + } + if(getTrainAlgo.toLowerCase.equals(Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM) || + getTestAlgo.toLowerCase.equals(Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM)) { + assign(tabDMLScript, "parallel_batches", "$parallel_batches") + } // ---------------------------------------------------------------------------- // Main logic - forBlock("iter", "1", "max_iter") { - performTrainingIter(lossLayers, shouldValidate, performOneHotEncoding) - if (getTrainAlgo.toLowerCase.equals("batch")) { - assign(tabDMLScript, "e", "iter") - tabDMLScript.append("# Learning rate\n") - lrPolicy.updateLearningRate(tabDMLScript) - } else { - ifBlock("iter %% num_iters_per_epoch == 0") { + getTrainAlgo.toLowerCase match { + case Caffe2DML.MINIBATCH_ALGORITHM => { + assign(tabDMLScript, "e", "0") + assign(tabDMLScript, "max_iter", ifdef("$max_iter", solverParam.getMaxIter.toString)) + forBlock("iter", "1", "max_iter") { + getTrainingBatch(tabDMLScript) + // ------------------------------------------------------- + // Perform forward, backward and update on minibatch + forward; backward; update + // ------------------------------------------------------- + if(solverParam.getDisplay > 0) { + ifBlock("iter %% " + solverParam.getDisplay + " == 0") { + displayTrainingLoss(lossLayers(0), performOneHotEncoding) + } + if(shouldValidate) { + ifBlock("iter %% " + solverParam.getTestInterval + " == 0") { + displayValidationLoss(lossLayers(0), performOneHotEncoding) + } + } + } + performSnapshot + ifBlock("iter %% num_batches_per_epoch == 0") { + // After every epoch, update the learning rate + assign(tabDMLScript, "e", "e + 1") + tabDMLScript.append("# Learning rate\n") + lrPolicy.updateLearningRate(tabDMLScript) + } + } + } + case Caffe2DML.BATCH_ALGORITHM => { + assign(tabDMLScript, "max_iter", ifdef("$max_iter", solverParam.getMaxIter.toString)) + assign(tabDMLScript, "max_epochs", ceil("(max_iter*" + Caffe2DML.batchSize + ")/" + Caffe2DML.numImages)) + forBlock("e", "1", "max_epochs") { + assign(tabDMLScript, "iter", "num_batches_per_epoch*e") + assign(tabDMLScript, "Xb", Caffe2DML.X) + assign(tabDMLScript, "yb", Caffe2DML.y) + // ------------------------------------------------------- + // Perform forward, backward and update on entire dataset + forward; backward; update + // ------------------------------------------------------- + if(solverParam.getDisplay > 0) { + // Show training/validation loss every epoch + displayTrainingLoss(lossLayers(0), performOneHotEncoding) + if(shouldValidate) + displayValidationLoss(lossLayers(0), performOneHotEncoding) + } + performSnapshot // After every epoch, update the learning rate - assign(tabDMLScript, "e", "e + 1") tabDMLScript.append("# Learning rate\n") lrPolicy.updateLearningRate(tabDMLScript) } } + case Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM => { + assign(tabDMLScript, "e", "0") + assign(tabDMLScript, "max_iter", ifdef("$max_iter", solverParam.getMaxIter.toString)) + forBlock("iter", "1", "max_iter", "parallel_batches") { + assign(tabDMLScript, "allreduce_start_index", "((iter-1) * " + Caffe2DML.batchSize + ") %% " + Caffe2DML.numImages + " + 1; ") + ifBlock("(allreduce_start_index + parallel_batches*" + Caffe2DML.batchSize + " - 1) > " + Caffe2DML.numImages) { + // TODO: Skip the last batch + assign(tabDMLScript, "allreduce_start_index", "1") + } + initializeGradients("parallel_batches") + parForBlock("j", "1", "parallel_batches", "1", getParforParameters()) { + // Get a mini-batch in this group + assign(tabDMLScript, "beg", "allreduce_start_index + (j-1)*" + Caffe2DML.batchSize) + assign(tabDMLScript, "end", "allreduce_start_index + j*" + Caffe2DML.batchSize + " - 1") + rightIndexing(tabDMLScript, "Xb", Caffe2DML.X, "beg", "end") + rightIndexing(tabDMLScript, "yb", Caffe2DML.y, "beg", "end") + forward; backward + flattenGradients + if(solverParam.getDisplay > 0) { + ifBlock("(iter + j - 1) %% " + solverParam.getDisplay + " == 0") { + displayTrainingLoss(lossLayers(0), performOneHotEncoding) + } + } + } + aggregateAggGradients + update + if(solverParam.getDisplay > 0 && shouldValidate) { + val iterMatrix = matrix("seq(iter, iter+parallel_batches-1)", "parallel_batches", "1") + ifBlock(sum(iterMatrix + " %% " + solverParam.getTestInterval + " == 0") + " > 0") { + displayValidationLoss(lossLayers(0), performOneHotEncoding) + } + } + performSnapshot + } + } + case Caffe2DML.ALLREDUCE_ALGORITHM => { + forBlock("iter", "1", "max_iter") { + // This is distributed synchronous gradient descent with batch size of 1 + // ------------------------------------------------------- + // Perform forward, backward and update on minibatch in parallel + assign(tabDMLScript, "beg", "((iter-1) * " + Caffe2DML.batchSize + ") %% " + Caffe2DML.numImages + " + 1;") + assign(tabDMLScript, "end", "min(beg + " + Caffe2DML.batchSize + " - 1, " + Caffe2DML.numImages + ")") + assign(tabDMLScript, "X_group_batch_size", "end - beg + 1") + initializeGradients("X_group_batch_size") + parForBlock("j", "beg", "end", "1", getParforParameters()) { + assign(tabDMLScript, "Xb", Caffe2DML.X + "[j,]") + assign(tabDMLScript, "yb", Caffe2DML.y + "[j,]") + forward; backward + flattenGradients + } + aggregateAggGradients + update + // ------------------------------------------------------- + if(solverParam.getDisplay > 0) { + ifBlock("iter %% " + solverParam.getDisplay + " == 0") { + assign(tabDMLScript, "Xb", Caffe2DML.X + "[beg:end,]") + assign(tabDMLScript, "yb", Caffe2DML.y + "[beg:end,]") + displayTrainingLoss(lossLayers(0), performOneHotEncoding) + } + if(shouldValidate) { + ifBlock("iter %% " + solverParam.getTestInterval + " == 0") { + displayValidationLoss(lossLayers(0), performOneHotEncoding) + } + } + } + performSnapshot + } + } + case _ => throw new DMLRuntimeException("Unsupported train algo:" + getTrainAlgo) } // ---------------------------------------------------------------------------- @@ -437,236 +473,112 @@ class Caffe2DML(val sc: SparkContext, (script, "X_full", "y_full") } // ================================================================================================ - - private def performTrainingIter(lossLayers: List[IsLossLayer], shouldValidate: Boolean, performOneHotEncoding:Boolean): Unit = - getTrainAlgo.toLowerCase match { - case "minibatch" => - getTrainingBatch(tabDMLScript) - // ------------------------------------------------------- - // Perform forward, backward and update on minibatch - forward; backward; update - // ------------------------------------------------------- - displayLoss(lossLayers(0), shouldValidate, performOneHotEncoding) - performSnapshot - case "batch" => { - // ------------------------------------------------------- - // Perform forward, backward and update on entire dataset - forward; backward; update - // ------------------------------------------------------- - displayLoss(lossLayers(0), shouldValidate, performOneHotEncoding) - performSnapshot - } - case "allreduce_parallel_batches" => { - // This setting uses the batch size provided by the user - if (!inputs.containsKey("$parallel_batches")) { - throw new RuntimeException("The parameter parallel_batches is required for allreduce_parallel_batches") - } - // The user specifies the number of parallel_batches - // This ensures that the user of generated script remembers to provide the commandline parameter $parallel_batches - assign(tabDMLScript, "parallel_batches", "$parallel_batches") - assign(tabDMLScript, "group_batch_size", "parallel_batches*" + Caffe2DML.batchSize) - assign(tabDMLScript, "groups", "as.integer(ceil(" + Caffe2DML.numImages + "/group_batch_size))") - // Grab groups of mini-batches - forBlock("g", "1", "groups") { - // Get next group of mini-batches - assign(tabDMLScript, "group_beg", "((g-1) * group_batch_size) %% " + Caffe2DML.numImages + " + 1") - assign(tabDMLScript, "group_end", "min(" + Caffe2DML.numImages + ", group_beg + group_batch_size - 1)") - assign(tabDMLScript, "X_group_batch", Caffe2DML.X + "[group_beg:group_end,]") - assign(tabDMLScript, "y_group_batch", Caffe2DML.y + "[group_beg:group_end,]") - initializeGradients("parallel_batches") - assign(tabDMLScript, "X_group_batch_size", nrow("X_group_batch")) - parForBlock("j", "1", "parallel_batches") { - // Get a mini-batch in this group - assign(tabDMLScript, "beg", "((j-1) * " + Caffe2DML.batchSize + ") %% nrow(X_group_batch) + 1") - assign(tabDMLScript, "end", "min(nrow(X_group_batch), beg + " + Caffe2DML.batchSize + " - 1)") - assign(tabDMLScript, "Xb", "X_group_batch[beg:end,]") - assign(tabDMLScript, "yb", "y_group_batch[beg:end,]") - forward; backward - flattenGradients - } - aggregateAggGradients - update - // ------------------------------------------------------- - assign(tabDMLScript, "Xb", "X_group_batch") - assign(tabDMLScript, "yb", "y_group_batch") - displayLoss(lossLayers(0), shouldValidate, performOneHotEncoding) - performSnapshot - } - } - case "allreduce" => { - // This is distributed synchronous gradient descent - // ------------------------------------------------------- - // Perform forward, backward and update on minibatch in parallel - assign(tabDMLScript, "beg", "((iter-1) * " + Caffe2DML.batchSize + ") %% " + Caffe2DML.numImages + " + 1") - assign(tabDMLScript, "end", " min(beg + " + Caffe2DML.batchSize + " - 1, " + Caffe2DML.numImages + ")") - assign(tabDMLScript, "X_group_batch", Caffe2DML.X + "[beg:end,]") - assign(tabDMLScript, "y_group_batch", Caffe2DML.y + "[beg:end,]") - assign(tabDMLScript, "X_group_batch_size", nrow("X_group_batch")) - tabDMLScript.append("local_batch_size = nrow(y_group_batch)\n") - val localBatchSize = "local_batch_size" - initializeGradients(localBatchSize) - parForBlock("j", "1", localBatchSize) { - assign(tabDMLScript, "Xb", "X_group_batch[j,]") - assign(tabDMLScript, "yb", "y_group_batch[j,]") - forward; backward - flattenGradients - } - aggregateAggGradients - update - // ------------------------------------------------------- - assign(tabDMLScript, "Xb", "X_group_batch") - assign(tabDMLScript, "yb", "y_group_batch") - displayLoss(lossLayers(0), shouldValidate, performOneHotEncoding) - performSnapshot - } - case _ => throw new DMLRuntimeException("Unsupported train algo:" + getTrainAlgo) - } // ------------------------------------------------------------------------------------------- // Helper functions to generate DML // Initializes Caffe2DML.X, Caffe2DML.y, Caffe2DML.XVal, Caffe2DML.yVal and Caffe2DML.numImages - private def trainTestSplit(numValidationBatches: Int): Unit = + private def trainTestSplit(numValidationBatches: Int): Unit = { if (numValidationBatches > 0) { if (solverParam.getDisplay <= 0) throw new DMLRuntimeException("Since test_iter and test_interval is greater than zero, you should set display to be greater than zero") - tabDMLScript.append(Caffe2DML.numValidationImages).append(" = " + numValidationBatches + " * " + Caffe2DML.batchSize + "\n") + assign(tabDMLScript, Caffe2DML.numValidationImages, numValidationBatches + " * " + Caffe2DML.batchSize) tabDMLScript.append("# Sanity check to ensure that validation set is not too large\n") - val maxValidationSize = "ceil(0.3 * " + Caffe2DML.numImages + ")" + val maxValidationSize = ceil("0.3 * " + Caffe2DML.numImages) ifBlock(Caffe2DML.numValidationImages + " > " + maxValidationSize) { - assign(tabDMLScript, "max_test_iter", "floor(" + maxValidationSize + " / " + Caffe2DML.batchSize + ")") - tabDMLScript.append( - "stop(" + - dmlConcat(asDMLString("Too large validation size. Please reduce test_iter to "), "max_test_iter") - + ")\n" - ) + stop(tabDMLScript, dmlConcat( + asDMLString("Too large validation size. Please reduce test_iter to "), floor(maxValidationSize + " / " + Caffe2DML.batchSize))) } - val one = "1" - val rl = int_add(Caffe2DML.numValidationImages, one) - rightIndexing(tabDMLScript.append(Caffe2DML.X).append(" = "), "X_full", rl, Caffe2DML.numImages, null, null) - tabDMLScript.append("; ") - rightIndexing(tabDMLScript.append(Caffe2DML.y).append(" = "), "y_full", rl, Caffe2DML.numImages, null, null) - tabDMLScript.append("; ") - rightIndexing(tabDMLScript.append(Caffe2DML.XVal).append(" = "), "X_full", one, Caffe2DML.numValidationImages, null, null) - tabDMLScript.append("; ") - rightIndexing(tabDMLScript.append(Caffe2DML.yVal).append(" = "), "y_full", one, Caffe2DML.numValidationImages, null, null) - tabDMLScript.append("; ") - tabDMLScript.append(Caffe2DML.numImages).append(" = nrow(y)\n") + rightIndexing(tabDMLScript, Caffe2DML.X, "X_full", int_add(Caffe2DML.numValidationImages, "1"), Caffe2DML.numImages) + rightIndexing(tabDMLScript, Caffe2DML.y, "y_full", int_add(Caffe2DML.numValidationImages, "1"), Caffe2DML.numImages) + rightIndexing(tabDMLScript, Caffe2DML.XVal, "X_full", "1", Caffe2DML.numValidationImages) + rightIndexing(tabDMLScript, Caffe2DML.yVal, "y_full", "1", Caffe2DML.numValidationImages) } else { assign(tabDMLScript, Caffe2DML.X, "X_full") assign(tabDMLScript, Caffe2DML.y, "y_full") - tabDMLScript.append(Caffe2DML.numImages).append(" = nrow(" + Caffe2DML.y + ")\n") } + assign(tabDMLScript, Caffe2DML.numImages, nrow(Caffe2DML.y)) + } + + private def displayTrainingLoss(lossLayer: IsLossLayer, performOneHotEncoding:Boolean): Unit = { + val DEBUG_TRAINING = if (inputs.containsKey("$debug")) inputs.get("$debug").toLowerCase.toBoolean else false + tabDMLScript.append("# Compute training loss & accuracy\n") + assign(tabDMLScript, "loss", "0"); assign(tabDMLScript, "accuracy", "0") + lossLayer.computeLoss(dmlScript, numTabs) + assign(tabDMLScript, "training_loss", "loss"); assign(tabDMLScript, "training_accuracy", "accuracy") + tabDMLScript.append( + print(dmlConcat(asDMLString("Iter:"), "iter", asDMLString(", training loss:"), "training_loss", asDMLString(", training accuracy:"), "training_accuracy")) + ) + if(performOneHotEncoding && DEBUG_TRAINING && !trainAlgoContainsParfor) { + printClassificationReport + } + } + + private def displayValidationLoss(lossLayer: IsLossLayer, performOneHotEncoding:Boolean): Unit = { + if (trainAlgoContainsParfor && testAlgoContainsParfor) { + Caffe2DML.LOG.warn("The setting: train_algo=" + getTrainAlgo + " and test_algo=" + getTestAlgo + " is not recommended. Consider changing test_algo=minibatch") + } + // Append the DML to compute validation loss + val numValidationBatches = if (solverParam.getTestIterCount > 0) solverParam.getTestIter(0) else 0 + tabDMLScript.append("# Compute validation loss & accuracy\n") + assign(tabDMLScript, "loss", "0"); assign(tabDMLScript, "accuracy", "0") + getTestAlgo.toLowerCase match { + case Caffe2DML.MINIBATCH_ALGORITHM => { + assign(tabDMLScript, "validation_loss", "0") + assign(tabDMLScript, "validation_accuracy", "0") + forBlock("iVal", "1", "num_batches_per_epoch") { + getValidationBatch(tabDMLScript) + forward; lossLayer.computeLoss(dmlScript, numTabs) + tabDMLScript.append("validation_loss = validation_loss + loss\n") + tabDMLScript.append("validation_accuracy = validation_accuracy + accuracy\n") + } + tabDMLScript.append("validation_accuracy = validation_accuracy / num_batches_per_epoch\n") + } + case Caffe2DML.BATCH_ALGORITHM => { + assign(tabDMLScript, "Xb", Caffe2DML.XVal); assign(tabDMLScript, "yb", Caffe2DML.yVal) + net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, false)) + lossLayer.computeLoss(dmlScript, numTabs) + assign(tabDMLScript, "validation_loss", "loss"); assign(tabDMLScript, "validation_accuracy", "accuracy") - // Append the DML to display training and validation loss - private def displayLoss(lossLayer: IsLossLayer, shouldValidate: Boolean, performOneHotEncoding:Boolean): Unit = { - if (solverParam.getDisplay > 0) { - // Append the DML to compute training loss - if (!getTrainAlgo.toLowerCase.startsWith("allreduce")) { - // Compute training loss for allreduce - tabDMLScript.append("# Compute training loss & accuracy\n") - ifBlock("iter %% " + solverParam.getDisplay + " == 0") { - assign(tabDMLScript, "loss", "0"); assign(tabDMLScript, "accuracy", "0") + } + case Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM => { + // This setting uses the batch size provided by the user + assign(tabDMLScript, "max_validation_iter", "as.integer(ceil(" + Caffe2DML.numValidationImages + "/" + Caffe2DML.batchSize + "))") + assign(tabDMLScript, "group_validation_loss", matrix("0", "max_validation_iter", "1")) + assign(tabDMLScript, "group_validation_accuracy", matrix("0", "max_validation_iter", "1")) + parForBlock("iVal", "1", "max_validation_iter", "1", getParforParameters()) { + assign(tabDMLScript, "validation_beg", "(iVal-1) * " + Caffe2DML.batchSize + " + 1") + assign(tabDMLScript, "validation_end", min(Caffe2DML.numValidationImages, "validation_beg + " + Caffe2DML.batchSize + " - 1")) + assign(tabDMLScript, "Xb", Caffe2DML.XVal + "[validation_beg:validation_end,]") + assign(tabDMLScript, "yb", Caffe2DML.yVal + "[validation_beg:validation_end,]") + net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, false)) lossLayer.computeLoss(dmlScript, numTabs) - assign(tabDMLScript, "training_loss", "loss"); assign(tabDMLScript, "training_accuracy", "accuracy") - tabDMLScript.append( - print(dmlConcat(asDMLString("Iter:"), "iter", asDMLString(", training loss:"), "training_loss", asDMLString(", training accuracy:"), "training_accuracy")) - ) - if(performOneHotEncoding) { - printClassificationReport - } + assign(tabDMLScript, "group_validation_loss[iVal,1]", "loss") + assign(tabDMLScript, "group_validation_accuracy[iVal,1]", "accuracy") } - } else { - Caffe2DML.LOG.info("Training loss is not printed for train_algo=" + getTrainAlgo) + assign(tabDMLScript, "validation_loss", "sum(group_validation_loss)") + assign(tabDMLScript, "validation_accuracy", "mean(group_validation_accuracy)") } - if (shouldValidate) { - if (getTrainAlgo.toLowerCase.startsWith("allreduce") && - getTestAlgo.toLowerCase.startsWith("allreduce")) { - Caffe2DML.LOG.warn("The setting: train_algo=" + getTrainAlgo + " and test_algo=" + getTestAlgo + " is not recommended. Consider changing test_algo=minibatch") - } - // Append the DML to compute validation loss - val numValidationBatches = if (solverParam.getTestIterCount > 0) solverParam.getTestIter(0) else 0 - tabDMLScript.append("# Compute validation loss & accuracy\n") - ifBlock("iter %% " + solverParam.getTestInterval + " == 0") { - assign(tabDMLScript, "loss", "0"); assign(tabDMLScript, "accuracy", "0") - getTestAlgo.toLowerCase match { - case "minibatch" => { - assign(tabDMLScript, "validation_loss", "0") - assign(tabDMLScript, "validation_accuracy", "0") - forBlock("iVal", "1", "num_iters_per_epoch") { - getValidationBatch(tabDMLScript) - forward; lossLayer.computeLoss(dmlScript, numTabs) - tabDMLScript.append("validation_loss = validation_loss + loss\n") - tabDMLScript.append("validation_accuracy = validation_accuracy + accuracy\n") - } - tabDMLScript.append("validation_accuracy = validation_accuracy / num_iters_per_epoch\n") - } - case "batch" => { - assign(tabDMLScript, "Xb", Caffe2DML.XVal); assign(tabDMLScript, "yb", Caffe2DML.yVal) - net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, false)) - lossLayer.computeLoss(dmlScript, numTabs) - assign(tabDMLScript, "validation_loss", "loss"); assign(tabDMLScript, "validation_accuracy", "accuracy") - - } - case "allreduce_parallel_batches" => { - // This setting uses the batch size provided by the user - if (!inputs.containsKey("$parallel_batches")) { - throw new RuntimeException("The parameter parallel_batches is required for allreduce_parallel_batches") - } - // The user specifies the number of parallel_batches - // This ensures that the user of generated script remembers to provide the commandline parameter $parallel_batches - assign(tabDMLScript, "parallel_batches_val", "$parallel_batches") - assign(tabDMLScript, "group_batch_size_val", "parallel_batches_val*" + Caffe2DML.batchSize) - assign(tabDMLScript, "groups_val", "as.integer(ceil(" + Caffe2DML.numValidationImages + "/group_batch_size_val))") - assign(tabDMLScript, "validation_accuracy", "0") - assign(tabDMLScript, "validation_loss", "0") - // Grab groups of mini-batches - forBlock("g_val", "1", "groups_val") { - assign(tabDMLScript, "group_beg_val", "((g_val-1) * group_batch_size_val) %% " + Caffe2DML.numValidationImages + " + 1") - assign(tabDMLScript, "group_end_val", "min(" + Caffe2DML.numValidationImages + ", group_beg_val + group_batch_size_val - 1)") - assign(tabDMLScript, "X_group_batch_val", Caffe2DML.XVal + "[group_beg_val:group_end_val,]") - assign(tabDMLScript, "y_group_batch_val", Caffe2DML.yVal + "[group_beg_val:group_end_val,]") - assign(tabDMLScript, "group_validation_loss", matrix("0", "parallel_batches_val", "1")) - assign(tabDMLScript, "group_validation_accuracy", matrix("0", "parallel_batches_val", "1")) - // Run graph on each mini-batch in this group in parallel (ideally on multiple GPUs) - parForBlock("iVal", "1", "parallel_batches_val") { - assign(tabDMLScript, "beg_val", "((iVal-1) * " + Caffe2DML.batchSize + ") %% nrow(y_group_batch_val) + 1") - assign(tabDMLScript, "end_val", "min(nrow(y_group_batch_val), beg_val + " + Caffe2DML.batchSize + " - 1)") - assign(tabDMLScript, "Xb", "X_group_batch_val[beg_val:end_val,]") - assign(tabDMLScript, "yb", "y_group_batch_val[beg_val:end_val,]") - net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, false)) - lossLayer.computeLoss(dmlScript, numTabs) - assign(tabDMLScript, "group_validation_loss[iVal,1]", "loss") - assign(tabDMLScript, "group_validation_accuracy[iVal,1]", "accuracy") - } - assign(tabDMLScript, "validation_loss", "validation_loss + sum(group_validation_loss)") - assign(tabDMLScript, "validation_accuracy", "validation_accuracy + sum(group_validation_accuracy)") - } - assign(tabDMLScript, "validation_accuracy", "validation_accuracy/groups_val") - } - case "allreduce" => { - // This setting doesnot use the batch size for validation and allows the parfor optimizer to select plan - // by minimizing the memory requirement (i.e. batch size = 1) - assign(tabDMLScript, "group_validation_loss", matrix("0", Caffe2DML.numValidationImages, "1")) - assign(tabDMLScript, "group_validation_accuracy", matrix("0", Caffe2DML.numValidationImages, "1")) - parForBlock("iVal", "1", Caffe2DML.numValidationImages) { - assign(tabDMLScript, "Xb", Caffe2DML.XVal + "[iVal,]") - assign(tabDMLScript, "yb", Caffe2DML.yVal + "[iVal,]") - net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, false)) - lossLayer.computeLoss(dmlScript, numTabs) - assign(tabDMLScript, "group_validation_loss[iVal,1]", "loss") - assign(tabDMLScript, "group_validation_accuracy[iVal,1]", "accuracy") - } - assign(tabDMLScript, "validation_loss", "sum(group_validation_loss)") - assign(tabDMLScript, "validation_accuracy", "mean(group_validation_accuracy)") - } - - case _ => throw new DMLRuntimeException("Unsupported test algo:" + getTestAlgo) - } - tabDMLScript.append( - print(dmlConcat(asDMLString("Iter:"), "iter", asDMLString(", validation loss:"), "validation_loss", asDMLString(", validation accuracy:"), "validation_accuracy")) - ) + case Caffe2DML.ALLREDUCE_ALGORITHM => { + // This setting doesnot use the batch size for validation and allows the parfor optimizer to select plan + // by minimizing the memory requirement (i.e. batch size = 1) + assign(tabDMLScript, "group_validation_loss", matrix("0", Caffe2DML.numValidationImages, "1")) + assign(tabDMLScript, "group_validation_accuracy", matrix("0", Caffe2DML.numValidationImages, "1")) + parForBlock("iVal", "1", Caffe2DML.numValidationImages, "1", getParforParameters()) { + assign(tabDMLScript, "Xb", Caffe2DML.XVal + "[iVal,]") + assign(tabDMLScript, "yb", Caffe2DML.yVal + "[iVal,]") + net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, false)) + lossLayer.computeLoss(dmlScript, numTabs) + assign(tabDMLScript, "group_validation_loss[iVal,1]", "loss") + assign(tabDMLScript, "group_validation_accuracy[iVal,1]", "accuracy") } + assign(tabDMLScript, "validation_loss", "sum(group_validation_loss)") + assign(tabDMLScript, "validation_accuracy", "mean(group_validation_accuracy)") } + + case _ => throw new DMLRuntimeException("Unsupported test algo:" + getTestAlgo) } + tabDMLScript.append( + print(dmlConcat(asDMLString("Iter:"), "iter", asDMLString(", validation loss:"), "validation_loss", asDMLString(", validation accuracy:"), "validation_accuracy")) + ) } private def appendSnapshotWrite(varName: String, fileName: String): Unit = tabDMLScript.append(write(varName, "snapshot_dir + \"" + fileName + "\"", "binary")) @@ -695,36 +607,73 @@ class Caffe2DML(val sc: SparkContext, } private def initializeGradients(parallel_batches: String): Unit = { tabDMLScript.append("# Data structure to store gradients computed in parallel\n") - net.getLayers - .map(layer => net.getCaffeLayer(layer)) - .map(l => { - if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight + "_agg", matrix("0", parallel_batches, multiply(nrow(l.weight), ncol(l.weight)))) - if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight + "_agg", matrix("0", parallel_batches, multiply(nrow(l.extraWeight), ncol(l.extraWeight)))) - if (l.shouldUpdateBias) assign(tabDMLScript, l.dBias + "_agg", matrix("0", parallel_batches, multiply(nrow(l.bias), ncol(l.bias)))) - }) + if(Caffe2DML.USE_PLUS_EQ) { + net.getLayers + .map(layer => net.getCaffeLayer(layer)) + .map(l => { + if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight + "_agg", matrix("0", nrow(l.weight), ncol(l.weight))) + if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight + "_agg", matrix("0", nrow(l.extraWeight), ncol(l.extraWeight))) + if (l.shouldUpdateBias) assign(tabDMLScript, l.dBias + "_agg", matrix("0", nrow(l.bias), ncol(l.bias))) + }) + } + else { + net.getLayers + .map(layer => net.getCaffeLayer(layer)) + .map(l => { + if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight + "_agg", matrix("0", parallel_batches, multiply(nrow(l.weight), ncol(l.weight)))) + if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight + "_agg", matrix("0", parallel_batches, multiply(nrow(l.extraWeight), ncol(l.extraWeight)))) + if (l.shouldUpdateBias) assign(tabDMLScript, l.dBias + "_agg", matrix("0", parallel_batches, multiply(nrow(l.bias), ncol(l.bias)))) + }) + } } private def flattenGradients(): Unit = { - tabDMLScript.append("# Flatten and store gradients for this parallel execution\n") - // Note: We multiply by a weighting to allow for proper gradient averaging during the - // aggregation even with uneven batch sizes. - assign(tabDMLScript, "weighting", "nrow(Xb)/X_group_batch_size") - net.getLayers - .map(layer => net.getCaffeLayer(layer)) - .map(l => { - if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight + "_agg[j,]", matrix(l.dWeight, "1", multiply(nrow(l.weight), ncol(l.weight))) + " * weighting") - if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight + "_agg[j,]", matrix(l.dExtraWeight, "1", multiply(nrow(l.extraWeight), ncol(l.extraWeight))) + " * weighting") - if (l.shouldUpdateWeight) assign(tabDMLScript, l.dBias + "_agg[j,]", matrix(l.dBias, "1", multiply(nrow(l.bias), ncol(l.bias))) + " * weighting") - }) + if(Caffe2DML.USE_PLUS_EQ) { + // Note: We multiply by a weighting to allow for proper gradient averaging during the + // aggregation even with uneven batch sizes. + assign(tabDMLScript, "weighting", "1/parallel_batches") // "nrow(Xb)/X_group_batch_size") + net.getLayers + .map(layer => net.getCaffeLayer(layer)) + .map(l => { + if (l.shouldUpdateWeight) assignPlusEq(tabDMLScript, l.dWeight + "_agg", l.dWeight + "*weighting") + if (l.shouldUpdateExtraWeight) assignPlusEq(tabDMLScript, l.dExtraWeight + "_agg", l.dExtraWeight + "*weighting") + if (l.shouldUpdateWeight) assignPlusEq(tabDMLScript, l.dBias + "_agg", l.dBias + "*weighting") + }) + } + else { + tabDMLScript.append("# Flatten and store gradients for this parallel execution\n") + // Note: We multiply by a weighting to allow for proper gradient averaging during the + // aggregation even with uneven batch sizes. + assign(tabDMLScript, "weighting", "1/parallel_batches") // "nrow(Xb)/X_group_batch_size") + net.getLayers + .map(layer => net.getCaffeLayer(layer)) + .map(l => { + if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight + "_agg[j,]", matrix(l.dWeight, "1", multiply(nrow(l.weight), ncol(l.weight))) + " * weighting") + if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight + "_agg[j,]", matrix(l.dExtraWeight, "1", multiply(nrow(l.extraWeight), ncol(l.extraWeight))) + " * weighting") + if (l.shouldUpdateWeight) assign(tabDMLScript, l.dBias + "_agg[j,]", matrix(l.dBias, "1", multiply(nrow(l.bias), ncol(l.bias))) + " * weighting") + }) + } } + private def aggregateAggGradients(): Unit = { - tabDMLScript.append("# Aggregate the gradients\n") - net.getLayers - .map(layer => net.getCaffeLayer(layer)) - .map(l => { - if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight, matrix(colSums(l.dWeight + "_agg"), nrow(l.weight), ncol(l.weight))) - if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight, matrix(colSums(l.dExtraWeight + "_agg"), nrow(l.extraWeight), ncol(l.extraWeight))) - if (l.shouldUpdateWeight) assign(tabDMLScript, l.dBias, matrix(colSums(l.dBias + "_agg"), nrow(l.bias), ncol(l.bias))) - }) + if(Caffe2DML.USE_PLUS_EQ) { + net.getLayers + .map(layer => net.getCaffeLayer(layer)) + .map(l => { + if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight, l.dWeight + "_agg") + if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight, l.dExtraWeight + "_agg") + if (l.shouldUpdateWeight) assign(tabDMLScript, l.dBias, l.dBias + "_agg") + }) + } + else { + tabDMLScript.append("# Aggregate the gradients\n") + net.getLayers + .map(layer => net.getCaffeLayer(layer)) + .map(l => { + if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight, matrix(colSums(l.dWeight + "_agg"), nrow(l.weight), ncol(l.weight))) + if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight, matrix(colSums(l.dExtraWeight + "_agg"), nrow(l.extraWeight), ncol(l.extraWeight))) + if (l.shouldUpdateWeight) assign(tabDMLScript, l.dBias, matrix(colSums(l.dBias + "_agg"), nrow(l.bias), ncol(l.bias))) + }) + } } // ------------------------------------------------------------------------------------------- } @@ -789,7 +738,7 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C val lastLayerShape = estimator.getOutputShapeOfLastLayer assign(tabDMLScript, "Prob", matrix("1", Caffe2DML.numImages, (lastLayerShape._1 * lastLayerShape._2 * lastLayerShape._3).toString)) estimator.getTestAlgo.toLowerCase match { - case "minibatch" => { + case Caffe2DML.MINIBATCH_ALGORITHM => { ceilDivide(tabDMLScript(), "num_iters", Caffe2DML.numImages, Caffe2DML.batchSize) forBlock("iter", "1", "num_iters") { getTestBatch(tabDMLScript) @@ -797,12 +746,12 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C assign(tabDMLScript, "Prob[beg:end,]", lossLayers(0).out) } } - case "batch" => { + case Caffe2DML.BATCH_ALGORITHM => { assign(tabDMLScript, "Xb", "X_full") net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, true)) assign(tabDMLScript, "Prob", lossLayers(0).out) } - case "allreduce_parallel_batches" => { + case Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM => { // This setting uses the batch size provided by the user if (!estimator.inputs.containsKey("$parallel_batches")) { throw new RuntimeException("The parameter parallel_batches is required for allreduce_parallel_batches") @@ -814,23 +763,20 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C assign(tabDMLScript, "groups", "as.integer(ceil(" + Caffe2DML.numImages + "/group_batch_size))") // Grab groups of mini-batches forBlock("g", "1", "groups") { - assign(tabDMLScript, "group_beg", "((g-1) * group_batch_size) %% " + Caffe2DML.numImages + " + 1") - assign(tabDMLScript, "group_end", "min(" + Caffe2DML.numImages + ", group_beg + group_batch_size - 1)") - assign(tabDMLScript, "X_group_batch", "X_full[group_beg:group_end,]") + assignBatch(tabDMLScript, "X_group_batch", "X_full", null, null, "group_", Caffe2DML.numImages, "g", "group_batch_size") + assign(tabDMLScript, "X_group_batch_size", nrow("X_group_batch")) // Run graph on each mini-batch in this group in parallel (ideally on multiple GPUs) - parForBlock("j", "1", "parallel_batches") { - assign(tabDMLScript, "beg", "((j-1) * " + Caffe2DML.batchSize + ") %% nrow(X_group_batch) + 1") - assign(tabDMLScript, "end", "min(nrow(X_group_batch), beg + " + Caffe2DML.batchSize + " - 1)") - assign(tabDMLScript, "Xb", "X_group_batch[beg:end,]") + parForBlock("j", "1", "parallel_batches", "1", estimator.getParforParameters()) { + assignBatch(tabDMLScript, "Xb", "X_group_batch", null, null, "", "X_group_batch_size", "j", Caffe2DML.batchSize) net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, true)) assign(tabDMLScript, "Prob[beg:end,]", lossLayers(0).out) } } } - case "allreduce" => { + case Caffe2DML.ALLREDUCE_ALGORITHM => { // This setting doesnot use the batch size for scoring and allows the parfor optimizer to select the best plan // by minimizing the memory requirement (i.e. batch size = 1) - parForBlock("iter", "1", Caffe2DML.numImages) { + parForBlock("iter", "1", Caffe2DML.numImages, "1", estimator.getParforParameters()) { assign(tabDMLScript, "Xb", "X_full[iter,]") net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, true)) assign(tabDMLScript, "Prob[iter,]", lossLayers(0).out) @@ -840,7 +786,7 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C } if (estimator.inputs.containsKey("$output_activations")) { - if (estimator.getTestAlgo.toLowerCase.equals("batch")) { + if (estimator.getTestAlgo.toLowerCase.equals(Caffe2DML.BATCH_ALGORITHM)) { net.getLayers.map( layer => tabDMLScript.append( http://git-wip-us.apache.org/repos/asf/systemml/blob/cf31ed2a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala index dd8d137..2c81eda 100644 --- a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala +++ b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala @@ -87,6 +87,7 @@ trait CaffeLayer extends BaseDMLGenerator { computedDout } def dX(bottomLayerID: Int) = "dOut" + id + "_" + bottomLayerID + def getIgnoreVarName(varNameHint:String):String = "ignore" + varNameHint + "_" + id + "_" + Math.abs(Caffe2DML.rand.nextLong) // -------------------------------------------------------------------------------------- // No need to override these methods in subclasses, instead classes that have weights and biases // should implement HasWeight and HasBias traits. @@ -371,7 +372,7 @@ class BatchNorm(val param: LayerParameter, val id: Int, val net: CaffeNetwork) e eps ) - private def withSuffix(str: String): String = if (update_mean_var) str else str + "_ignore" + private def withSuffix(str: String): String = if (update_mean_var) str else getIgnoreVarName(str) override def weightShape(): Array[Int] = Array(numChannels.toInt, 1) override def biasShape(): Array[Int] = Array(numChannels.toInt, 1) def cache_mean(): String = "cache_mean" + id @@ -1016,7 +1017,7 @@ class MaxPooling(val param: LayerParameter, val id: Int, val net: CaffeNetwork) * - Wout: Output width. */ override def forward(dmlScript: StringBuilder, isPrediction: Boolean) = - invokeForward(dmlScript, List[String](out, "ignoreHout_" + id, "ignoreWout_" + id), X, numChannels, Hin, Win, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) + invokeForward(dmlScript, List[String](out, getIgnoreVarName("Hout"), getIgnoreVarName("Wout")), X, numChannels, Hin, Win, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) /* * Computes the backward pass for a 2D spatial max pooling layer. * The input data has N examples, each represented as a 3D volume @@ -1168,7 +1169,7 @@ class Convolution(val param: LayerParameter, val id: Int, val net: CaffeNetwork) if (isDepthWise) invokeForward( dmlScript, - List[String](out, "ignoreHout_" + id, "ignoreWout_" + id), + List[String](out, getIgnoreVarName("Hout"), getIgnoreVarName("Wout")), X, weight, bias, @@ -1185,7 +1186,7 @@ class Convolution(val param: LayerParameter, val id: Int, val net: CaffeNetwork) ) else invokeForward(dmlScript, - List[String](out, "ignoreHout_" + id, "ignoreWout_" + id), + List[String](out, getIgnoreVarName("Hout"), getIgnoreVarName("Wout")), X, weight, bias, @@ -1423,7 +1424,7 @@ class DeConvolution(val param: LayerParameter, val id: Int, val net: CaffeNetwor if (isDepthWise) invokeForward( dmlScript, - List[String](out, "ignoreHout_" + id, "ignoreWout_" + id), + List[String](out, getIgnoreVarName("Hout"), getIgnoreVarName("Wout")), X, weight, bias, @@ -1443,7 +1444,7 @@ class DeConvolution(val param: LayerParameter, val id: Int, val net: CaffeNetwor else invokeForward( dmlScript, - List[String](out, "ignoreHout_" + id, "ignoreWout_" + id), + List[String](out, getIgnoreVarName("Hout"), getIgnoreVarName("Wout")), X, weight, bias, http://git-wip-us.apache.org/repos/asf/systemml/blob/cf31ed2a/src/main/scala/org/apache/sysml/api/dl/DMLGenerator.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/DMLGenerator.scala b/src/main/scala/org/apache/sysml/api/dl/DMLGenerator.scala index 5d25116..efe5c00 100644 --- a/src/main/scala/org/apache/sysml/api/dl/DMLGenerator.scala +++ b/src/main/scala/org/apache/sysml/api/dl/DMLGenerator.scala @@ -57,6 +57,8 @@ trait BaseDMLGenerator { def asDMLString(str: String): String = "\"" + str + "\"" def assign(dmlScript: StringBuilder, lhsVar: String, rhsVar: String): Unit = dmlScript.append(lhsVar).append(" = ").append(rhsVar).append("\n") + def assignPlusEq(dmlScript: StringBuilder, lhsVar: String, rhsVar: String): Unit = + dmlScript.append(lhsVar).append(" += ").append(rhsVar).append("\n") def sum(dmlScript: StringBuilder, variables: List[String]): StringBuilder = { if (variables.length > 1) dmlScript.append("(") dmlScript.append(variables(0)) @@ -104,12 +106,12 @@ trait BaseDMLGenerator { invoke(dmlScript, namespace1, returnVariables, functionName, arguments.toList, appendNewLine) def invoke(dmlScript: StringBuilder, namespace1: String, returnVariables: List[String], functionName: String, arguments: String*): Unit = invoke(dmlScript, namespace1, returnVariables, functionName, arguments.toList, true) - def rightIndexing(dmlScript: StringBuilder, varName: String, rl: String, ru: String, cl: String, cu: String): StringBuilder = { - dmlScript.append(varName).append("[") + def rightIndexing(dmlScript: StringBuilder, lhsVar:String, rhsVar: String, rl: String, ru: String, cl: String=null, cu: String=null): StringBuilder = { + dmlScript.append(lhsVar).append(" = ").append(rhsVar).append("[") if (rl != null && ru != null) dmlScript.append(rl).append(":").append(ru) dmlScript.append(",") if (cl != null && cu != null) dmlScript.append(cl).append(":").append(cu) - dmlScript.append("]") + dmlScript.append("]\n") } // Performs assignVar = ceil(lhsVar/rhsVar) def ceilDivide(dmlScript: StringBuilder, assignVar: String, lhsVar: String, rhsVar: String): Unit = @@ -124,7 +126,12 @@ trait BaseDMLGenerator { ret.toString } def matrix(init: String, rows: String, cols: String): String = "matrix(" + init + ", rows=" + rows + ", cols=" + cols + ")" + def sum(m: String): String = "sum(" + m + ")" def nrow(m: String): String = "nrow(" + m + ")" + def ceil(m: String): String = "ceil(" + m + ")" + def floor(m: String): String = "floor(" + m + ")" + def stop(dmlScript: StringBuilder, m: String): StringBuilder = dmlScript.append("stop(" + m + ")\n") + def asInteger(m: String): String = "as.integer(" + m + ")" def ncol(m: String): String = "ncol(" + m + ")" def customAssert(cond: Boolean, msg: String) = if (!cond) throw new DMLRuntimeException(msg) def multiply(v1: String, v2: String): String = v1 + "*" + v2 @@ -166,29 +173,26 @@ trait SourceDMLGenerator extends TabbedDMLGenerator { trait NextBatchGenerator extends TabbedDMLGenerator { def min(lhs: String, rhs: String): String = "min(" + lhs + ", " + rhs + ")" - - def assignBatch(dmlScript: StringBuilder, Xb: String, X: String, yb: String, y: String, indexPrefix: String, N: String, i: String): StringBuilder = { - dmlScript.append(indexPrefix).append("beg = ((" + i + "-1) * " + Caffe2DML.batchSize + ") %% " + N + " + 1; ") - dmlScript.append(indexPrefix).append("end = min(beg + " + Caffe2DML.batchSize + " - 1, " + N + "); ") + // Creates a DML script for: + // index_prefix_beg = ((i-1) * batchSize) %% N + 1; + // index_prefix_end = min(index_prefix_beg + batchSize - 1, N); + // Xb = X[ index_prefix_beg: index_prefix_end, ]; yb = y[ index_prefix_beg: index_prefix_end, ]; + def assignBatch(dmlScript: StringBuilder, Xb: String, X: String, yb: String, y: String, indexPrefix: String, N: String, i: String, batchSize:String): StringBuilder = { + dmlScript.append(indexPrefix).append("beg = ((" + i + "-1) * " + batchSize + ") %% " + N + " + 1; ") + dmlScript.append(indexPrefix).append("end = min(" + indexPrefix + "beg + " + batchSize + " - 1, " + N + "); ") dmlScript.append(Xb).append(" = ").append(X).append("[").append(indexPrefix).append("beg:").append(indexPrefix).append("end,]; ") if (yb != null && y != null) dmlScript.append(yb).append(" = ").append(y).append("[").append(indexPrefix).append("beg:").append(indexPrefix).append("end,]; ") dmlScript.append("\n") } def getTestBatch(tabDMLScript: StringBuilder): Unit = - assignBatch(tabDMLScript, "Xb", Caffe2DML.X, null, null, "", Caffe2DML.numImages, "iter") - + assignBatch(tabDMLScript, "Xb", Caffe2DML.X, null, null, "", Caffe2DML.numImages, "iter", Caffe2DML.batchSize) def getTrainingBatch(tabDMLScript: StringBuilder): Unit = - assignBatch(tabDMLScript, "Xb", Caffe2DML.X, "yb", Caffe2DML.y, "", Caffe2DML.numImages, "iter") - def getTrainingBatch(tabDMLScript: StringBuilder, X: String, y: String, numImages: String): Unit = - assignBatch(tabDMLScript, "Xb", X, "yb", y, "", numImages, "i") - def getTrainingMaxiBatch(tabDMLScript: StringBuilder): Unit = - assignBatch(tabDMLScript, "X_group_batch", Caffe2DML.X, "y_group_batch", Caffe2DML.y, "group_", Caffe2DML.numImages, "g") + assignBatch(tabDMLScript, "Xb", Caffe2DML.X, "yb", Caffe2DML.y, "", Caffe2DML.numImages, "iter", Caffe2DML.batchSize) def getValidationBatch(tabDMLScript: StringBuilder): Unit = - assignBatch(tabDMLScript, "Xb", Caffe2DML.XVal, "yb", Caffe2DML.yVal, "", Caffe2DML.numValidationImages, "iVal") + assignBatch(tabDMLScript, "Xb", Caffe2DML.XVal, "yb", Caffe2DML.yVal, "", Caffe2DML.numValidationImages, "iVal", Caffe2DML.batchSize) } - trait DMLGenerator extends SourceDMLGenerator with NextBatchGenerator { // Also makes "code reading" possible for Caffe2DML :) var dmlScript = new StringBuilder @@ -220,6 +224,13 @@ trait DMLGenerator extends SourceDMLGenerator with NextBatchGenerator { numTabs -= 1 tabDMLScript.append("}\n") } + def forBlock(iterVarName: String, startVal: String, endVal: String, step:String)(op: => Unit) { + tabDMLScript.append("for(" + iterVarName + " in seq(" + startVal + "," + endVal + "," + step + ")) {\n") + numTabs += 1 + op + numTabs -= 1 + tabDMLScript.append("}\n") + } def forBlock(iterVarName: String, startVal: String, endVal: String)(op: => Unit) { tabDMLScript.append("for(" + iterVarName + " in " + startVal + ":" + endVal + ") {\n") numTabs += 1 @@ -227,8 +238,11 @@ trait DMLGenerator extends SourceDMLGenerator with NextBatchGenerator { numTabs -= 1 tabDMLScript.append("}\n") } - def parForBlock(iterVarName: String, startVal: String, endVal: String)(op: => Unit) { - tabDMLScript.append("parfor(" + iterVarName + " in " + startVal + ":" + endVal + ") {\n") + def parForBlock(iterVarName: String, startVal: String, endVal: String, step:String, parforParameters:String)(op: => Unit) { + if(step.equals("1")) + tabDMLScript.append("parfor(" + iterVarName + " in " + startVal + ":" + endVal + parforParameters + ") {\n") + else + tabDMLScript.append("parfor(" + iterVarName + " in seq(" + startVal + "," + endVal + "," + step + ")" + parforParameters + ") {\n") numTabs += 1 op numTabs -= 1 http://git-wip-us.apache.org/repos/asf/systemml/blob/cf31ed2a/src/main/scala/org/apache/sysml/api/dl/Utils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/Utils.scala b/src/main/scala/org/apache/sysml/api/dl/Utils.scala index e771ed1..63aaf91 100644 --- a/src/main/scala/org/apache/sysml/api/dl/Utils.scala +++ b/src/main/scala/org/apache/sysml/api/dl/Utils.scala @@ -302,6 +302,22 @@ object Utils { } } // -------------------------------------------------------------- + + // Returns the memory requirement for the layer in number of bytes + def getMemInBytes(l:CaffeLayer, batchSize:Int, isTraining:Boolean):Long = { + val numLayerInput = if(!l.isInstanceOf[Data]) l.bottomLayerOutputShape._1.toLong * l.bottomLayerOutputShape._2.toLong * l.bottomLayerOutputShape._3.toLong * batchSize else 0 + val numLayerOutput = l.outputShape._1.toLong * l.outputShape._2.toLong * l.outputShape._3.toLong * batchSize + val numLayerError = numLayerOutput + val numLayerWeights = if(l.weightShape != null) { + val nWt = l.weightShape()(0).toLong * l.weightShape()(1).toLong + if(l.extraWeightShape != null) l.extraWeightShape()(0).toLong * l.extraWeightShape()(1).toLong + nWt + else nWt + } else 0 + val numLayerBias = if(l.biasShape != null)l.biasShape()(0).toLong * l.biasShape()(1).toLong else 0 + val numLayerGradients = (numLayerWeights + numLayerBias) * batchSize + if(isTraining) (numLayerInput + numLayerOutput + numLayerError + numLayerWeights + numLayerBias + numLayerGradients)*java.lang.Double.BYTES + else (numLayerInput + numLayerOutput + numLayerWeights + numLayerBias)*java.lang.Double.BYTES + } } class Utils {
