Repository: incubator-systemml Updated Branches: refs/heads/master ad3e78a28 -> cc7993fc8
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/scala/org/apache/sysml/api/dl/CaffeSolver.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeSolver.scala b/src/main/scala/org/apache/sysml/api/dl/CaffeSolver.scala new file mode 100644 index 0000000..755949d --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/dl/CaffeSolver.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysml.api.dl + +import caffe.Caffe.SolverParameter +import org.apache.sysml.runtime.DMLRuntimeException +import caffe.Caffe + +trait CaffeSolver { + def sourceFileName:String; + def update(dmlScript:StringBuilder, layer:CaffeLayer):Unit; + def init(dmlScript:StringBuilder, layer:CaffeLayer):Unit; + + // ---------------------------------------------------------------- + // Used for Fine-tuning + private def getLayerLr(layer:CaffeLayer, paramIndex:Int):String = { + val param = layer.param.getParamList + if(param == null || param.size() <= paramIndex) + return "lr" + else + // TODO: Ignoring param.get(index).getDecayMult for now + return "(lr * " + param.get(paramIndex).getLrMult + ")" + } + // the first param { } is for the weights and the second is for the biases. + def getWeightLr(layer:CaffeLayer):String = getLayerLr(layer, 0) + def getBiasLr(layer:CaffeLayer):String = getLayerLr(layer, 1) + // ---------------------------------------------------------------- + + def commaSep(arr:String*):String = { + if(arr.length == 1) arr(0) else { + var ret = arr(0) + for(i <- 1 until arr.length) { + ret = ret + "," + arr(i) + } + ret + } + } + + def l2reg_update(lambda:Double, dmlScript:StringBuilder, layer:CaffeLayer):Unit = { + // val donotRegularizeLayers:Boolean = layer.isInstanceOf[BatchNorm] || layer.isInstanceOf[Scale]; + if(lambda != 0 && layer.shouldUpdateWeight) { + dmlScript.append("\t").append(layer.dWeight + "_reg = l2_reg::backward(" + layer.weight + ", " + lambda + ")\n") + dmlScript.append("\t").append(layer.dWeight + " = " + layer.dWeight + " + " + layer.dWeight + "_reg\n") + } + } +} + +class LearningRatePolicy(lr_policy:String="exp", base_lr:Double=0.01) { + def this(solver:Caffe.SolverParameter) { + this(solver.getLrPolicy, solver.getBaseLr) + if(solver.hasGamma) setGamma(solver.getGamma) + if(solver.hasStepsize) setStepsize(solver.getStepsize) + if(solver.hasPower()) setPower(solver.getPower) + } + var gamma:Double = 0.95 + var step:Double = 100000 + var power:Double = 0.75 + def setGamma(gamma1:Double):Unit = { gamma = gamma1 } + def setStepsize(step1:Double):Unit = { step = step1 } + def setPower(power1:Double): Unit = { power = power1 } + def updateLearningRate(dmlScript:StringBuilder):Unit = { + val new_lr = lr_policy.toLowerCase match { + case "fixed" => base_lr.toString + case "step" => "(" + base_lr + " * " + gamma + " ^ " + " floor(e/" + step + "))" + case "exp" => "(" + base_lr + " * " + gamma + "^e)" + case "inv" => "(" + base_lr + "* (1 + " + gamma + " * e) ^ (-" + power + "))" + case "poly" => "(" + base_lr + " * (1 - e/ max_epochs) ^ " + power + ")" + case "sigmoid" => "(" + base_lr + "( 1/(1 + exp(-" + gamma + "* (e - " + step + "))))" + case _ => throw new DMLRuntimeException("The lr policy is not supported:" + lr_policy) + } + dmlScript.append("lr = " + new_lr + "\n") + } +} + +/** + * lambda: regularization parameter + * momentum: Momentum value. Typical values are in the range of [0.5, 0.99], usually started at the lower end and annealed towards the higher end. + */ +class SGD(lambda:Double=5e-04, momentum:Double=0.9) extends CaffeSolver { + def update(dmlScript:StringBuilder, layer:CaffeLayer):Unit = { + l2reg_update(lambda, dmlScript, layer) + if(momentum == 0) { + // Use sgd + if(layer.shouldUpdateWeight) dmlScript.append("\t").append(layer.weight + " = sgd::update(" + commaSep(layer.weight, layer.dWeight, getWeightLr(layer)) + ")\n") + if(layer.shouldUpdateBias) dmlScript.append("\t").append(layer.bias + " = sgd::update(" + commaSep(layer.bias, layer.dBias, getBiasLr(layer)) + ")\n") + } + else { + // Use sgd_momentum + if(layer.shouldUpdateWeight) dmlScript.append("\t").append("["+ commaSep(layer.weight, layer.weight+"_v") + "] " + + "= sgd_momentum::update(" + commaSep(layer.weight, layer.dWeight, getWeightLr(layer), momentum.toString, layer.weight+"_v") + ")\n") + if(layer.shouldUpdateBias) dmlScript.append("\t").append("["+ commaSep(layer.bias, layer.bias+"_v") + "] " + + "= sgd_momentum::update(" + commaSep(layer.bias, layer.dBias, getBiasLr(layer), momentum.toString, layer.bias+"_v") + ")\n") + } + } + def init(dmlScript:StringBuilder, layer:CaffeLayer):Unit = { + if(momentum != 0) { + if(layer.shouldUpdateWeight) dmlScript.append(layer.weight+"_v = sgd_momentum::init(" + layer.weight + ")\n") + if(layer.shouldUpdateBias) dmlScript.append(layer.bias+"_v = sgd_momentum::init(" + layer.bias + ")\n") + } + } + def sourceFileName:String = if(momentum == 0) "sgd" else "sgd_momentum" +} + +/** + * lambda: regularization parameter + * epsilon: Smoothing term to avoid divide by zero errors. Typical values are in the range of [1e-8, 1e-4]. + * + * See Adaptive Subgradient Methods for Online Learning and Stochastic Optimization, Duchi et al. + */ +class AdaGrad(lambda:Double=5e-04, epsilon:Double=1e-6) extends CaffeSolver { + def update(dmlScript:StringBuilder, layer:CaffeLayer):Unit = { + l2reg_update(lambda, dmlScript, layer) + if(layer.shouldUpdateWeight) dmlScript.append("\t").append("["+ commaSep(layer.weight, layer.weight+"_cache") + "] " + + "= adagrad::update(" + commaSep(layer.weight, layer.dWeight, getWeightLr(layer), epsilon.toString, layer.weight+"_cache") + ")\n") + if(layer.shouldUpdateBias) dmlScript.append("\t").append("["+ commaSep(layer.bias, layer.bias+"_cache") + "] " + + "= adagrad::update(" + commaSep(layer.bias, layer.dBias, getBiasLr(layer), epsilon.toString, layer.bias+"_cache") + ")\n") + } + def init(dmlScript:StringBuilder, layer:CaffeLayer):Unit = { + if(layer.shouldUpdateWeight) dmlScript.append(layer.weight+"_cache = adagrad::init(" + layer.weight + ")\n") + if(layer.shouldUpdateBias) dmlScript.append(layer.bias+"_cache = adagrad::init(" + layer.bias + ")\n") + } + def sourceFileName:String = "adagrad" +} + +/** + * lambda: regularization parameter + * momentum: Momentum value. Typical values are in the range of [0.5, 0.99], usually started at the lower end and annealed towards the higher end. + */ +class Nesterov(lambda:Double=5e-04, momentum:Double=0.9) extends CaffeSolver { + def update(dmlScript:StringBuilder, layer:CaffeLayer):Unit = { + l2reg_update(lambda, dmlScript, layer) + if(layer.shouldUpdateWeight) dmlScript.append("\t").append("["+ commaSep(layer.weight, layer.weight+"_v") + "] " + + "= sgd_nesterov::update(" + commaSep(layer.weight, layer.dWeight, getWeightLr(layer), momentum.toString, layer.weight+"_v") + ")\n") + if(layer.shouldUpdateBias) dmlScript.append("\t").append("["+ commaSep(layer.bias, layer.bias+"_v") + "] " + + "= sgd_nesterov::update(" + commaSep(layer.bias, layer.dBias, getBiasLr(layer), momentum.toString, layer.bias+"_v") + ")\n") + } + def init(dmlScript:StringBuilder, layer:CaffeLayer):Unit = { + if(layer.shouldUpdateWeight) dmlScript.append(layer.weight+"_v = sgd_nesterov::init(" + layer.weight + ")\n") + if(layer.shouldUpdateBias) dmlScript.append(layer.bias+"_v = sgd_nesterov::init(" + layer.bias + ")\n") + } + def sourceFileName:String = "sgd_nesterov" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/scala/org/apache/sysml/api/dl/DMLGenerator.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/DMLGenerator.scala b/src/main/scala/org/apache/sysml/api/dl/DMLGenerator.scala new file mode 100644 index 0000000..ec4269a --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/dl/DMLGenerator.scala @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysml.api.dl + +import java.util.HashSet +import caffe.Caffe.LayerParameter; +import caffe.Caffe.NetParameter; +import caffe.Caffe.SolverParameter; +import org.apache.sysml.runtime.DMLRuntimeException; +import scala.collection.JavaConversions._ +import caffe.Caffe + +trait BaseDMLGenerator { + def commaSep(arr:List[String]):String = { + if(arr.length == 1) arr(0) else { + var ret = arr(0) + for(i <- 1 until arr.length) { + ret = ret + "," + arr(i) + } + ret + } + } + def commaSep(arr:String*):String = { + if(arr.length == 1) arr(0) else { + var ret = arr(0) + for(i <- 1 until arr.length) { + ret = ret + "," + arr(i) + } + ret + } + } + def int_add(v1:String, v2:String):String = { + try { (v1.toDouble + v2.toDouble).toInt.toString } catch { case _:Throwable => "("+v1+"+"+v2+")"} + } + def int_mult(v1:String, v2:String, v3:String):String = { + try { (v1.toDouble * v2.toDouble * v3.toDouble).toInt.toString } catch { case _:Throwable => "("+v1+"*"+v2+"*"+v3+")"} + } + def isNumber(x: String):Boolean = x forall Character.isDigit + def transpose(x:String):String = "t(" + x + ")" + def write(varName:String, fileName:String, format:String):String = "write(" + varName + ", \"" + fileName + "\", format=\"" + format + "\")\n" + def read(varName:String, fileName:String, sep:String="/"):String = varName + " = read(weights + \"" + sep + fileName + "\")\n" + def asDMLString(str:String):String = "\"" + str + "\"" + def assign(dmlScript:StringBuilder, lhsVar:String, rhsVar:String):Unit = { + dmlScript.append(lhsVar).append(" = ").append(rhsVar).append("\n") + } + def sum(dmlScript:StringBuilder, variables:List[String]):StringBuilder = { + if(variables.length > 1) dmlScript.append("(") + dmlScript.append(variables(0)) + for(i <- 1 until variables.length) { + dmlScript.append(" + ").append(variables(i)) + } + if(variables.length > 1) dmlScript.append(")") + return dmlScript + } + def addAndAssign(dmlScript:StringBuilder, lhsVar:String, rhsVars:List[String]):Unit = { + dmlScript.append(lhsVar).append(" = ") + sum(dmlScript, rhsVars) + dmlScript.append("\n") + } + def invoke(dmlScript:StringBuilder, namespace1:String, returnVariables:List[String], functionName:String, arguments:List[String]):Unit = { + if(returnVariables.length == 0) throw new DMLRuntimeException("User-defined functions should have atleast one return value") + if(returnVariables.length > 1) dmlScript.append("[") + dmlScript.append(returnVariables(0)) + if(returnVariables.length > 1) { + for(i <- 1 until returnVariables.length) { + dmlScript.append(",").append(returnVariables(i)) + } + dmlScript.append("]") + } + dmlScript.append(" = ") + dmlScript.append(namespace1) + dmlScript.append(functionName) + dmlScript.append("(") + if(arguments != null) { + if(arguments.length != 0) + dmlScript.append(arguments(0)) + if(arguments.length > 1) { + for(i <- 1 until arguments.length) { + dmlScript.append(",").append(arguments(i)) + } + } + } + dmlScript.append(")\n") + } + def invoke(dmlScript:StringBuilder, namespace1:String, returnVariables:List[String], functionName:String, arguments:String*):Unit = { + invoke(dmlScript, namespace1, returnVariables, functionName, arguments.toList) + } + def rightIndexing(dmlScript:StringBuilder, varName:String, rl:String, ru:String, cl:String, cu:String):StringBuilder = { + dmlScript.append(varName).append("[") + if(rl != null && ru != null) dmlScript.append(rl).append(":").append(ru) + dmlScript.append(",") + if(cl != null && cu != null) dmlScript.append(cl).append(":").append(cu) + dmlScript.append("]") + } + // Performs assignVar = ceil(lhsVar/rhsVar) + def ceilDivide(dmlScript:StringBuilder, assignVar:String, lhsVar:String, rhsVar:String):Unit = + dmlScript.append(assignVar).append(" = ").append("ceil(").append(lhsVar).append(" / ").append(rhsVar).append(")\n") + def print(arg:String):String = "print(" + arg + ")\n" + def dmlConcat(arg:String*):String = { + val ret = new StringBuilder + ret.append(arg(0)) + for(i <- 1 until arg.length) { + ret.append(" + ").append(arg(i)) + } + ret.toString + } + def matrix(init:String, rows:String, cols:String):String = "matrix(" + init + ", rows=" + rows + ", cols=" + cols + ")" + def nrow(m:String):String = "nrow(" + m + ")" + def ncol(m:String):String = "ncol(" + m + ")" + def customAssert(cond:Boolean, msg:String) = if(!cond) throw new DMLRuntimeException(msg) +} + +trait TabbedDMLGenerator extends BaseDMLGenerator { + def tabDMLScript(dmlScript:StringBuilder, numTabs:Int):StringBuilder = tabDMLScript(dmlScript, numTabs, false) + def tabDMLScript(dmlScript:StringBuilder, numTabs:Int, prependNewLine:Boolean):StringBuilder = { + if(prependNewLine) dmlScript.append("\n") + for(i <- 0 until numTabs) dmlScript.append("\t") + dmlScript + } +} + +trait SourceDMLGenerator extends TabbedDMLGenerator { + val alreadyImported:HashSet[String] = new HashSet[String] + def source(dmlScript:StringBuilder, numTabs:Int, sourceFileName:String, dir:String):Unit = { + if(sourceFileName != null && !alreadyImported.contains(sourceFileName)) { + tabDMLScript(dmlScript, numTabs).append("source(\"" + dir + sourceFileName + ".dml\") as " + sourceFileName + "\n") + alreadyImported.add(sourceFileName) + } + } + def source(dmlScript:StringBuilder, numTabs:Int, net:CaffeNetwork, solver:CaffeSolver, otherFiles:Array[String]):Unit = { + // Add layers with multiple source files + if(net.getLayers.filter(layer => net.getCaffeLayer(layer).isInstanceOf[SoftmaxWithLoss]).length > 0) { + source(dmlScript, numTabs, "softmax", Caffe2DML.layerDir) + source(dmlScript, numTabs, "cross_entropy_loss", Caffe2DML.layerDir) + } + net.getLayers.map(layer => source(dmlScript, numTabs, net.getCaffeLayer(layer).sourceFileName, Caffe2DML.layerDir)) + if(solver != null) + source(dmlScript, numTabs, solver.sourceFileName, Caffe2DML.optimDir) + if(otherFiles != null) + otherFiles.map(sourceFileName => source(dmlScript, numTabs, sourceFileName, Caffe2DML.layerDir)) + } +} + +trait NextBatchGenerator extends TabbedDMLGenerator { + def min(lhs:String, rhs:String): String = "min(" + lhs + ", " + rhs + ")" + + def assignBatch(dmlScript:StringBuilder, Xb:String, X:String, yb:String, y:String, indexPrefix:String, N:String, i:String):StringBuilder = { + dmlScript.append(indexPrefix).append("beg = ((" + i + "-1) * " + Caffe2DML.batchSize + ") %% " + N + " + 1; ") + dmlScript.append(indexPrefix).append("end = min(beg + " + Caffe2DML.batchSize + " - 1, " + N + "); ") + dmlScript.append(Xb).append(" = ").append(X).append("[").append(indexPrefix).append("beg:").append(indexPrefix).append("end,]; ") + if(yb != null && y != null) + dmlScript.append(yb).append(" = ").append(y).append("[").append(indexPrefix).append("beg:").append(indexPrefix).append("end,]; ") + dmlScript.append("\n") + } + def getTestBatch(tabDMLScript:StringBuilder):Unit = { + assignBatch(tabDMLScript, "Xb", Caffe2DML.X, null, null, "", Caffe2DML.numImages, "i") + } + def getTrainingBatch(tabDMLScript:StringBuilder):Unit = { + assignBatch(tabDMLScript, "Xb", Caffe2DML.X, "yb", Caffe2DML.y, "", Caffe2DML.numImages, "i") + } + def getTrainingBatch(tabDMLScript:StringBuilder, X:String, y:String, numImages:String):Unit = { + assignBatch(tabDMLScript, "Xb", X, "yb", y, "", numImages, "i") + } + def getTrainingMaxiBatch(tabDMLScript:StringBuilder):Unit = { + assignBatch(tabDMLScript, "X_group_batch", Caffe2DML.X, "y_group_batch", Caffe2DML.y, "group_", Caffe2DML.numImages, "g") + } + def getValidationBatch(tabDMLScript:StringBuilder):Unit = { + assignBatch(tabDMLScript, "Xb", Caffe2DML.XVal, "yb", Caffe2DML.yVal, "", Caffe2DML.numValidationImages, "iVal") + } +} + +trait VisualizeDMLGenerator extends TabbedDMLGenerator { + var doVisualize = false + var _tensorboardLogDir:String = null + def setTensorBoardLogDir(log:String): Unit = { _tensorboardLogDir = log } + def tensorboardLogDir:String = { + if(_tensorboardLogDir == null) { + _tensorboardLogDir = java.io.File.createTempFile("temp", System.nanoTime().toString()).getAbsolutePath + } + _tensorboardLogDir + } + def visualizeLoss(): Unit = { + checkTensorBoardDependency() + doVisualize = true + // Visualize for both training and validation + visualize(" ", " ", "training_loss", "iter", "training_loss", true) + visualize(" ", " ", "training_accuracy", "iter", "training_accuracy", true) + visualize(" ", " ", "validation_loss", "iter", "validation_loss", false) + visualize(" ", " ", "validation_accuracy", "iter", "validation_accuracy", false) + } + val visTrainingDMLScript: StringBuilder = new StringBuilder + val visValidationDMLScript: StringBuilder = new StringBuilder + def checkTensorBoardDependency():Unit = { + try { + if(!doVisualize) + Class.forName( "com.google.protobuf.GeneratedMessageV3") + } catch { + case _:ClassNotFoundException => throw new DMLRuntimeException("To use visualize() feature, you will have to include protobuf-java-3.2.0.jar in your classpath. Hint: you can download the jar from http://central.maven.org/maven2/com/google/protobuf/protobuf-java/3.2.0/protobuf-java-3.2.0.jar") + } + } + private def visualize(layerName:String, varType:String, aggFn:String, x:String, y:String, isTraining:Boolean) = { + val dmlScript = if(isTraining) visTrainingDMLScript else visValidationDMLScript + dmlScript.append("viz_counter1 = visualize(" + + commaSep(asDMLString(layerName), asDMLString(varType), asDMLString(aggFn), x, y, asDMLString(tensorboardLogDir)) + + ");\n") + dmlScript.append("viz_counter = viz_counter + viz_counter1\n") + } + def appendVisualizationHeaders(dmlScript:StringBuilder, numTabs:Int): Unit = { + if(doVisualize) { + tabDMLScript(dmlScript, numTabs).append("visualize = externalFunction(String layerName, String varType, String aggFn, Double x, Double y, String logDir) return (Double B) " + + "implemented in (classname=\"org.apache.sysml.udf.lib.Caffe2DMLVisualizeWrapper\",exectype=\"mem\"); \n") + tabDMLScript(dmlScript, numTabs).append("viz_counter = 0\n") + System.out.println("Please use the following command for visualizing: tensorboard --logdir=" + tensorboardLogDir) + } + } + def visualizeLayer(net:CaffeNetwork, layerName:String, varType:String, aggFn:String): Unit = { + // 'weight', 'bias', 'dweight', 'dbias', 'output' or 'doutput' + // 'sum', 'mean', 'var' or 'sd' + checkTensorBoardDependency() + doVisualize = true + if(net.getLayers.filter(_.equals(layerName)).size == 0) + throw new DMLRuntimeException("Cannot visualize the layer:" + layerName) + val dmlVar = { + val l = net.getCaffeLayer(layerName) + varType match { + case "weight" => l.weight + case "bias" => l.bias + case "dweight" => l.dWeight + case "dbias" => l.dBias + case "output" => l.out + case "doutput" => l.dX + case _ => throw new DMLRuntimeException("Cannot visualize the variable of type:" + varType) + } + } + if(dmlVar == null) + throw new DMLRuntimeException("Cannot visualize the variable of type:" + varType) + // Visualize for both training and validation + visualize(layerName, varType, aggFn, "iter", aggFn + "(" + dmlVar + ")", true) + visualize(layerName, varType, aggFn, "iter", aggFn + "(" + dmlVar + ")", false) + } + + def appendTrainingVisualizationBody(dmlScript:StringBuilder, numTabs:Int): Unit = { + if(doVisualize) + tabDMLScript(dmlScript, numTabs).append(visTrainingDMLScript.toString) + } + def appendValidationVisualizationBody(dmlScript:StringBuilder, numTabs:Int): Unit = { + if(doVisualize) + tabDMLScript(dmlScript, numTabs).append(visValidationDMLScript.toString) + } +} + +trait DMLGenerator extends SourceDMLGenerator with NextBatchGenerator with VisualizeDMLGenerator { + // Also makes "code reading" possible for Caffe2DML :) + var dmlScript = new StringBuilder + var numTabs = 0 + def reset():Unit = { + dmlScript.clear() + alreadyImported.clear() + numTabs = 0 + visTrainingDMLScript.clear() + visValidationDMLScript.clear() + doVisualize = false + } + // ------------------------------------------------------------------------------------------------- + // Helper functions that calls super class methods and simplifies the code of this trait + def tabDMLScript():StringBuilder = tabDMLScript(dmlScript, numTabs, false) + def tabDMLScript(prependNewLine:Boolean):StringBuilder = tabDMLScript(dmlScript, numTabs, prependNewLine) + def source(net:CaffeNetwork, solver:CaffeSolver, otherFiles:Array[String]):Unit = { + source(dmlScript, numTabs, net, solver, otherFiles) + } + // ------------------------------------------------------------------------------------------------- + + def ifBlock(cond:String)(op: => Unit) { + tabDMLScript.append("if(" + cond + ") {\n") + numTabs += 1 + op + numTabs -= 1 + tabDMLScript.append("}\n") + } + def forBlock(iterVarName:String, startVal:String, endVal:String)(op: => Unit) { + tabDMLScript.append("for(" + iterVarName + " in " + startVal + ":" + endVal + ") {\n") + numTabs += 1 + op + numTabs -= 1 + tabDMLScript.append("}\n") + } + def parForBlock(iterVarName:String, startVal:String, endVal:String)(op: => Unit) { + tabDMLScript.append("parfor(" + iterVarName + " in " + startVal + ":" + endVal + ") {\n") + numTabs += 1 + op + numTabs -= 1 + tabDMLScript.append("}\n") + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/scala/org/apache/sysml/api/dl/Utils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/Utils.scala b/src/main/scala/org/apache/sysml/api/dl/Utils.scala new file mode 100644 index 0000000..b9d6d33 --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/dl/Utils.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysml.api.dl +import scala.collection.JavaConversions._ +import caffe.Caffe.LayerParameter; +import caffe.Caffe.NetParameter; +import org.apache.sysml.parser.LanguageException; +import com.google.protobuf.TextFormat; +import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import caffe.Caffe.SolverParameter; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import org.apache.sysml.runtime.DMLRuntimeException +import java.io.StringReader +import java.io.BufferedReader + +object Utils { + // --------------------------------------------------------------------------------------------- + // Helper methods for DML generation + + // Returns number of classes if inferred from the network + def numClasses(net:CaffeNetwork):String = { + try { + return "" + net.getCaffeLayer(net.getLayers().last).outputShape._1.toLong + } catch { + case _:Throwable => { + Caffe2DML.LOG.warn("Cannot infer the number of classes from network definition. User needs to pass it via set(num_classes=...) method.") + return "$num_classes" // Expect users to provide it + } + } + } + def prettyPrintDMLScript(script:String) { + val bufReader = new BufferedReader(new StringReader(script)) + var line = bufReader.readLine(); + var lineNum = 1 + while( line != null ) { + System.out.println( "%03d".format(lineNum) + "|" + line) + lineNum = lineNum + 1 + line = bufReader.readLine() + } + } + + // --------------------------------------------------------------------------------------------- + def parseSolver(solverFilePath:String): CaffeSolver = parseSolver(readCaffeSolver(solverFilePath)) + def parseSolver(solver:SolverParameter): CaffeSolver = { + val momentum = if(solver.hasMomentum) solver.getMomentum else 0.0 + val lambda = if(solver.hasWeightDecay) solver.getWeightDecay else 0.0 + val delta = if(solver.hasDelta) solver.getDelta else 0.0 + + solver.getType.toLowerCase match { + case "sgd" => new SGD(lambda, momentum) + case "adagrad" => new AdaGrad(lambda, delta) + case "nesterov" => new Nesterov(lambda, momentum) + case _ => throw new DMLRuntimeException("The solver type is not supported: " + solver.getType + ". Try: SGD, AdaGrad or Nesterov.") + } + + } + + def getPrefix():String = { + val f = new File("nn") + if(f.exists() && f.isDirectory()) { + Caffe2DML.LOG.info("Since nn directory exists in current folder, using it.") + return "nn" + } + else { + // TODO: Extract from the jar + throw new RuntimeException("In current version, we require that you download the nn folder into current directory from https://github.com/apache/incubator-systemml/tree/master/scripts/staging/SystemML-NN") + } + } + + // -------------------------------------------------------------- + // Caffe utility functions + def readCaffeNet(netFilePath:String):NetParameter = { + val reader:InputStreamReader = getInputStreamReader(netFilePath); + val builder:NetParameter.Builder = NetParameter.newBuilder(); + TextFormat.merge(reader, builder); + return builder.build(); + } + + def readCaffeSolver(solverFilePath:String):SolverParameter = { + val reader = getInputStreamReader(solverFilePath); + val builder = SolverParameter.newBuilder(); + TextFormat.merge(reader, builder); + return builder.build(); + } + + // -------------------------------------------------------------- + // File IO utility functions + def getInputStreamReader(filePath:String ):InputStreamReader = { + //read solver script from file + if(filePath == null) + throw new LanguageException("file path was not specified!"); + if(filePath.startsWith("hdfs:") || filePath.startsWith("gpfs:")) { + if( !LocalFileUtils.validateExternalFilename(filePath, true) ) + throw new LanguageException("Invalid (non-trustworthy) hdfs filename."); + val fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); + return new InputStreamReader(fs.open(new Path(filePath))); + } + else { + if( !LocalFileUtils.validateExternalFilename(filePath, false) ) + throw new LanguageException("Invalid (non-trustworthy) local filename."); + return new InputStreamReader(new FileInputStream(new File(filePath)), "ASCII"); + } + } + // -------------------------------------------------------------- +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala index fb9697d..a104c5c 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala @@ -33,6 +33,8 @@ import org.apache.sysml.api.mlcontext._ import org.apache.sysml.api.mlcontext.ScriptFactory._ import org.apache.spark.sql._ import org.apache.sysml.api.mlcontext.MLContext.ExplainLevel +import java.util.HashMap +import scala.collection.JavaConversions._ trait HasLaplace extends Params { final val laplace: Param[Double] = new Param[Double](this, "laplace", "Laplace smoothing specified by the user to avoid creation of 0 probabilities.") @@ -65,8 +67,25 @@ trait HasRegParam extends Params { final def getRegParam: Double = $(regParam) } -trait BaseSystemMLEstimator { - +trait BaseSystemMLEstimatorOrModel { + var enableGPU:Boolean = false + var explain:Boolean = false + var statistics:Boolean = false + val config:HashMap[String, String] = new HashMap[String, String]() + def setGPU(enableGPU1:Boolean):BaseSystemMLEstimatorOrModel = { enableGPU = enableGPU1; this} + def setExplain(explain1:Boolean):BaseSystemMLEstimatorOrModel = { explain = explain1; this} + def setStatistics(statistics1:Boolean):BaseSystemMLEstimatorOrModel = { statistics = statistics1; this} + def setConfigProperty(key:String, value:String):BaseSystemMLEstimatorOrModel = { config.put(key, value); this} + def updateML(ml:MLContext):Unit = { + ml.setGPU(enableGPU); ml.setExplain(explain); ml.setStatistics(statistics); config.map(x => ml.setConfigProperty(x._1, x._2)) + } + def copyProperties(other:BaseSystemMLEstimatorOrModel):BaseSystemMLEstimatorOrModel = { + other.setGPU(enableGPU); other.setExplain(explain); other.setStatistics(statistics); config.map(x => other.setConfigProperty(x._1, x._2)) + return other + } +} + +trait BaseSystemMLEstimator extends BaseSystemMLEstimatorOrModel { def transformSchema(schema: StructType): StructType = schema // Returns the script and variables for X and y @@ -79,9 +98,10 @@ trait BaseSystemMLEstimator { def toDouble(d:Double): java.lang.Double = { double2Double(d) } + } -trait BaseSystemMLEstimatorModel { +trait BaseSystemMLEstimatorModel extends BaseSystemMLEstimatorOrModel { def toDouble(i:Int): java.lang.Double = { double2Double(i.toDouble) } @@ -96,19 +116,19 @@ trait BaseSystemMLEstimatorModel { } trait BaseSystemMLClassifier extends BaseSystemMLEstimator { - def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = { val isSingleNode = true val ml = new MLContext(sc) + updateML(ml) y_mb.recomputeNonZeros(); val ret = getTrainingScript(isSingleNode) val script = ret._1.in(ret._2, X_mb).in(ret._3, y_mb) ml.execute(script) } - def baseFit(df: ScriptsUtils.SparkDataType, sc: SparkContext): MLResults = { val isSingleNode = false val ml = new MLContext(df.rdd.sparkContext) + updateML(ml) val mcXin = new MatrixCharacteristics() val Xin = RDDConverterUtils.dataFrameToBinaryBlock(sc, df.asInstanceOf[DataFrame].select("features"), mcXin, false, true) val revLabelMapping = new java.util.HashMap[Int, String] @@ -121,10 +141,11 @@ trait BaseSystemMLClassifier extends BaseSystemMLEstimator { } trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { - + def baseTransform(X: MatrixBlock, mloutput: MLResults, sc: SparkContext, probVar:String): MatrixBlock = { val isSingleNode = true val ml = new MLContext(sc) + updateML(ml) val script = getPredictionScript(mloutput, isSingleNode) // Uncomment for debugging // ml.setExplainLevel(ExplainLevel.RECOMPILE_RUNTIME) @@ -137,11 +158,12 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { } return ret } - + def baseTransform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, sc: SparkContext, probVar:String, outputProb:Boolean=true): DataFrame = { val isSingleNode = false val ml = new MLContext(sc) + updateML(ml) val mcXin = new MatrixCharacteristics() val Xin = RDDConverterUtils.dataFrameToBinaryBlock(df.rdd.sparkContext, df.asInstanceOf[DataFrame].select("features"), mcXin, false, true) val script = getPredictionScript(mloutput, isSingleNode) @@ -161,4 +183,4 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala index 08154bb..5dd23e0 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala @@ -38,6 +38,7 @@ trait BaseSystemMLRegressor extends BaseSystemMLEstimator { def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = { val isSingleNode = true val ml = new MLContext(sc) + updateML(ml) val ret = getTrainingScript(isSingleNode) val script = ret._1.in(ret._2, X_mb).in(ret._3, y_mb) ml.execute(script) @@ -46,6 +47,7 @@ trait BaseSystemMLRegressor extends BaseSystemMLEstimator { def baseFit(df: ScriptsUtils.SparkDataType, sc: SparkContext): MLResults = { val isSingleNode = false val ml = new MLContext(df.rdd.sparkContext) + updateML(ml) val mcXin = new MatrixCharacteristics() val Xin = RDDConverterUtils.dataFrameToBinaryBlock(sc, df.asInstanceOf[DataFrame], mcXin, false, true) val yin = df.select("label") @@ -61,6 +63,7 @@ trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel { def baseTransform(X: MatrixBlock, mloutput: MLResults, sc: SparkContext, predictionVar:String): MatrixBlock = { val isSingleNode = true val ml = new MLContext(sc) + updateML(ml) val script = getPredictionScript(mloutput, isSingleNode) val modelPredict = ml.execute(script._1.in(script._2, X)) val ret = modelPredict.getBinaryBlockMatrix(predictionVar).getMatrixBlock @@ -74,6 +77,7 @@ trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel { def baseTransform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, sc: SparkContext, predictionVar:String): DataFrame = { val isSingleNode = false val ml = new MLContext(sc) + updateML(ml) val mcXin = new MatrixCharacteristics() val Xin = RDDConverterUtils.dataFrameToBinaryBlock(df.rdd.sparkContext, df.asInstanceOf[DataFrame], mcXin, false, true) val script = getPredictionScript(mloutput, isSingleNode) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala index ce89502..9f3d844 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala @@ -142,4 +142,4 @@ object LogisticRegressionExample { lrmodel.transform(testing.toDF).show } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala index a7b3a74..9161a8f 100644 --- a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala +++ b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala @@ -106,4 +106,4 @@ class NaiveBayesModel(override val uid: String) def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, "probs") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, mloutput, sc, "probs") -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/scala/org/apache/sysml/api/ml/SVM.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/SVM.scala b/src/main/scala/org/apache/sysml/api/ml/SVM.scala index ea24de6..db8ce3a 100644 --- a/src/main/scala/org/apache/sysml/api/ml/SVM.scala +++ b/src/main/scala/org/apache/sysml/api/ml/SVM.scala @@ -110,4 +110,4 @@ class SVMModel (override val uid: String)(val mloutput: MLResults, val sc: Spark def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, "scores") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, mloutput, sc, "scores") -} \ No newline at end of file +}
