http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/proto/tensorflow/event.proto ---------------------------------------------------------------------- diff --git a/src/main/proto/tensorflow/event.proto b/src/main/proto/tensorflow/event.proto new file mode 100644 index 0000000..06d1992 --- /dev/null +++ b/src/main/proto/tensorflow/event.proto @@ -0,0 +1,102 @@ +//------------------------------------------------------------- +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +//------------------------------------------------------------- +syntax = "proto3"; + +package tensorflow; +option cc_enable_arenas = true; +option java_outer_classname = "EventProtos"; +option java_multiple_files = true; +option java_package = "org.tensorflow.util"; + +import "summary.proto"; + +// Protocol buffer representing an event that happened during +// the execution of a Brain model. +message Event { + // Timestamp of the event. + double wall_time = 1; + + // Global step of the event. + int64 step = 2; + + oneof what { + // An event file was started, with the specified version. + // This is use to identify the contents of the record IO files + // easily. Current version is "brain.Event:2". All versions + // start with "brain.Event:". + string file_version = 3; + // An encoded version of a GraphDef. + bytes graph_def = 4; + // A summary was generated. + Summary summary = 5; + // The user output a log message. Not all messages are logged, only ones + // generated via the Python tensorboard_logging module. + LogMessage log_message = 6; + // The state of the session which can be used for restarting after crashes. + SessionLog session_log = 7; + // The metadata returned by running a session.run() call. + TaggedRunMetadata tagged_run_metadata = 8; + // An encoded version of a MetaGraphDef. + bytes meta_graph_def = 9; + } +} + +// Protocol buffer used for logging messages to the events file. +message LogMessage { + enum Level { + UNKNOWN = 0; + // Note: The logging level 10 cannot be named DEBUG. Some software + // projects compile their C/C++ code with -DDEBUG in debug builds. So the + // C++ code generated from this file should not have an identifier named + // DEBUG. + DEBUGGING = 10; + INFO = 20; + WARN = 30; + ERROR = 40; + FATAL = 50; + } + Level level = 1; + string message = 2; +} + +// Protocol buffer used for logging session state. +message SessionLog { + enum SessionStatus { + STATUS_UNSPECIFIED = 0; + START = 1; + STOP = 2; + CHECKPOINT = 3; + } + + SessionStatus status = 1; + // This checkpoint_path contains both the path and filename. + string checkpoint_path = 2; + string msg = 3; +} + +// For logging the metadata output for a single session.run() call. +message TaggedRunMetadata { + // Tag name associated with this metadata. + string tag = 1; + // Byte-encoded version of the `RunMetadata` proto in order to allow lazy + // deserialization. + bytes run_metadata = 2; +}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/proto/tensorflow/summary.proto ---------------------------------------------------------------------- diff --git a/src/main/proto/tensorflow/summary.proto b/src/main/proto/tensorflow/summary.proto new file mode 100644 index 0000000..fc8053c --- /dev/null +++ b/src/main/proto/tensorflow/summary.proto @@ -0,0 +1,123 @@ +//------------------------------------------------------------- +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +//------------------------------------------------------------- +syntax = "proto3"; + +package tensorflow; +option cc_enable_arenas = true; +option java_outer_classname = "SummaryProtos"; +option java_multiple_files = true; +option java_package = "org.tensorflow.framework"; + +// import "tensorflow/core/framework/tensor.proto"; + +// Metadata associated with a series of Summary data +message SummaryDescription { + // Hint on how plugins should process the data in this series. + // Supported values include "scalar", "histogram", "image", "audio" + string type_hint = 1; +} + +// Serialization format for histogram module in +// core/lib/histogram/histogram.h +message HistogramProto { + double min = 1; + double max = 2; + double num = 3; + double sum = 4; + double sum_squares = 5; + + // Parallel arrays encoding the bucket boundaries and the bucket values. + // bucket(i) is the count for the bucket i. The range for + // a bucket is: + // i == 0: -DBL_MAX .. bucket_limit(0) + // i != 0: bucket_limit(i-1) .. bucket_limit(i) + repeated double bucket_limit = 6 [packed = true]; + repeated double bucket = 7 [packed = true]; +}; + +// A Summary is a set of named values to be displayed by the +// visualizer. +// +// Summaries are produced regularly during training, as controlled by +// the "summary_interval_secs" attribute of the training operation. +// Summaries are also produced at the end of an evaluation. +message Summary { + message Image { + // Dimensions of the image. + int32 height = 1; + int32 width = 2; + // Valid colorspace values are + // 1 - grayscale + // 2 - grayscale + alpha + // 3 - RGB + // 4 - RGBA + // 5 - DIGITAL_YUV + // 6 - BGRA + int32 colorspace = 3; + // Image data in encoded format. All image formats supported by + // image_codec::CoderUtil can be stored here. + bytes encoded_image_string = 4; + } + + message Audio { + // Sample rate of the audio in Hz. + float sample_rate = 1; + // Number of channels of audio. + int64 num_channels = 2; + // Length of the audio in frames (samples per channel). + int64 length_frames = 3; + // Encoded audio data and its associated RFC 2045 content type (e.g. + // "audio/wav"). + bytes encoded_audio_string = 4; + string content_type = 5; + } + + message Value { + // Name of the node that output this summary; in general, the name of a + // TensorSummary node. If the node in question has multiple outputs, then + // a ":\d+" suffix will be appended, like "some_op:13". + // Might not be set for legacy summaries (i.e. those not using the tensor + // value field) + string node_name = 7; + + // Tag name for the data. Will only be used by legacy summaries + // (ie. those not using the tensor value field) + // For legacy summaries, will be used as the title of the graph + // in the visualizer. + // + // Tag is usually "op_name:value_name", where "op_name" itself can have + // structure to indicate grouping. + string tag = 1; + + // Value associated with the tag. + oneof value { + float simple_value = 2; + bytes obsolete_old_style_histogram = 3; + Image image = 4; + HistogramProto histo = 5; + Audio audio = 6; + // TensorProto tensor = 8; + } + } + + // Set of values for the summary. + repeated Value value = 1; +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/python/setup.py ---------------------------------------------------------------------- diff --git a/src/main/python/setup.py b/src/main/python/setup.py index 635dad7..fcda255 100644 --- a/src/main/python/setup.py +++ b/src/main/python/setup.py @@ -38,10 +38,12 @@ ARTIFACT_VERSION_SHORT = ARTIFACT_VERSION.split("-")[0] numpy_version = '1.8.2' scipy_version = '0.15.1' +pillow_version = '2.0.0' REQUIRED_PACKAGES = [ 'numpy >= %s' % numpy_version, 'scipy >= %s' % scipy_version, - 'pandas' + 'pandas', + 'Pillow >= %s' % pillow_version ] http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/python/systemml/converters.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/converters.py b/src/main/python/systemml/converters.py index 9651f14..8bf05d7 100644 --- a/src/main/python/systemml/converters.py +++ b/src/main/python/systemml/converters.py @@ -19,7 +19,7 @@ # #------------------------------------------------------------- -__all__ = [ 'getNumCols', 'convertToMatrixBlock', 'convertToNumPyArr', 'convertToPandasDF', 'SUPPORTED_TYPES' , 'convertToLabeledDF'] +__all__ = [ 'getNumCols', 'convertToMatrixBlock', 'convertToNumPyArr', 'convertToPandasDF', 'SUPPORTED_TYPES' , 'convertToLabeledDF', 'convertImageToNumPyArr'] import numpy as np import pandas as pd @@ -118,6 +118,35 @@ def convertToNumPyArr(sc, mb): else: raise TypeError('sc needs to be of type SparkContext') # TODO: We can generalize this by creating py4j gateway ourselves +# Example usage: convertImageToNumPyArr(im, img_shape=(3, 224, 224), add_rotated_images=True, add_mirrored_images=True) +# The above call returns a numpy array of shape (6, 50176) in NCHW format +def convertImageToNumPyArr(im, img_shape=None, add_rotated_images=False, add_mirrored_images=False): + from PIL import Image + if img_shape is not None: + num_channels = img_shape[0] + size = (img_shape[1], img_shape[2]) + else: + num_channels = 1 if im.mode == 'L' else 3 + size = None + if num_channels != 1 and num_channels != 3: + raise ValueError('Expected the number of channels to be either 1 or 3') + if size is not None: + im = im.resize(size, Image.LANCZOS) + expected_mode = 'L' if num_channels == 1 else 'RGB' + if expected_mode is not im.mode: + im = im.convert(expected_mode) + def _im2NumPy(im): + if expected_mode == 'L': + return np.asarray(im.getdata()).reshape((1, -1)) + else: + # (H,W,C) --> (C,H,W) --> (1, C*H*W) + return np.asarray(im).transpose(2, 0, 1).reshape((1, -1)) + ret = _im2NumPy(im) + if add_rotated_images: + ret = np.vstack((ret, _im2NumPy(im.rotate(90)), _im2NumPy(im.rotate(180)), _im2NumPy(im.rotate(270)) )) + if add_mirrored_images: + ret = np.vstack((ret, _im2NumPy(im.transpose(Image.FLIP_LEFT_RIGHT)), _im2NumPy(im.transpose(Image.FLIP_TOP_BOTTOM)))) + return ret def convertToPandasDF(X): if not isinstance(X, pd.DataFrame): http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/python/systemml/mllearn/estimators.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py index d6ad069..94aa1f2 100644 --- a/src/main/python/systemml/mllearn/estimators.py +++ b/src/main/python/systemml/mllearn/estimators.py @@ -19,7 +19,7 @@ # #------------------------------------------------------------- -__all__ = ['LinearRegression', 'LogisticRegression', 'SVM', 'NaiveBayes'] +__all__ = ['LinearRegression', 'LogisticRegression', 'SVM', 'NaiveBayes', 'Caffe2DML'] import numpy as np from pyspark.ml import Estimator @@ -45,6 +45,7 @@ def assemble(sparkSession, pdf, inputCols, outputCol): class BaseSystemMLEstimator(Estimator): features_col = 'features' label_col = 'label' + do_visualize = False def set_features_col(self, colName): """ @@ -66,6 +67,21 @@ class BaseSystemMLEstimator(Estimator): """ self.label_col = colName + def setGPU(self, enableGPU): + self.estimator.setGPU(enableGPU) + return self + + def setExplain(self, explain): + self.estimator.setExplain(explain) + return self + + def setStatistics(self, stat): + self.estimator.setStatistics(stat) + return self + + def setConfigProperty(self, propertyName, propertyValue): + self.estimator.setConfigProperty(propertyName, propertyValue) + return self def _fit_df(self): try: @@ -158,6 +174,11 @@ class BaseSystemMLEstimator(Estimator): ---------- X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame """ + try: + if self.estimator is not None and self.model is not None: + self.estimator.copyProperties(self.model) + except AttributeError: + pass if isinstance(X, SUPPORTED_TYPES): if self.transferUsingDF: pdfX = convertToPandasDF(X) @@ -206,6 +227,13 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator): else: return [ self.labelMap[int(i)] for i in y ] + def predict(self, X): + predictions = np.asarray(super(BaseSystemMLClassifier, self).predict(X)) + try: + return np.asarray(predictions, dtype='double') + except ValueError: + return np.asarray(predictions, dtype='str') + def score(self, X, y): """ Scores the predicted value with ground truth 'y' @@ -215,8 +243,11 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator): X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix """ - return accuracy_score(y, self.predict(X)) - + predictions = np.asarray(self.predict(X)) + if np.issubdtype(predictions.dtype.type, np.number): + return accuracy_score(y, predictions) + else: + return accuracy_score(np.asarray(y, dtype='str'), np.asarray(predictions, dtype='str')) class BaseSystemMLRegressor(BaseSystemMLEstimator): @@ -499,4 +530,133 @@ class NaiveBayes(BaseSystemMLClassifier): self.estimator = self.sc._jvm.org.apache.sysml.api.ml.NaiveBayes(self.uid, self.sc._jsc.sc()) self.estimator.setLaplace(laplace) self.transferUsingDF = transferUsingDF - self.setOutputRawPredictionsToFalse = False \ No newline at end of file + self.setOutputRawPredictionsToFalse = False + +class Caffe2DML(BaseSystemMLClassifier): + """ + Performs training/prediction for a given caffe network. + + Examples + -------- + + >>> from systemml.mllearn import Caffe2DML + >>> from pyspark.sql import SQLContext + >>> sqlCtx = SQLContext(sc) + >>> from mlxtend.data import mnist_data + >>> import numpy as np + >>> from sklearn.utils import shuffle + >>> X, y = mnist_data() + >>> X, y = shuffle(X, y) + >>> imgShape = (1, 28, 28) + >>> import urllib + >>> urllib.urlretrieve('https://raw.githubusercontent.com/niketanpansare/model_zoo/master/caffe/vision/lenet/mnist/lenet.proto', 'lenet.proto') + >>> urllib.urlretrieve('https://raw.githubusercontent.com/niketanpansare/model_zoo/master/caffe/vision/lenet/mnist/lenet_solver.proto', 'lenet_solver.proto') + >>> caffe2DML = Caffe2DML(sqlCtx, 'lenet_solver.proto').set(max_iter=500) + >>> caffe2DML.fit(X, y) + """ + def __init__(self, sqlCtx, solver, input_shape, weights=None, ignore_weights=None, transferUsingDF=False, tensorboard_log_dir=None): + """ + Performs training/prediction for a given caffe network. + + Parameters + ---------- + sqlCtx: PySpark SQLContext + solver: caffe solver file path + input_shape: 3-element list (number of channels, input height, input width) + weights: directory whether learned weights are stored (default: None) + ignore_weights: names of layers to not read from the weights directory (list of string, default:None) + transferUsingDF: whether to pass the input dataset via PySpark DataFrame (default: False) + tensorboard_log_dir: directory to store the event logs (default: None, we use a temporary directory) + """ + self.sqlCtx = sqlCtx + self.sc = sqlCtx._sc + self.uid = "Caffe2DML" + self.model = None + if len(input_shape) != 3: + raise ValueError('Expected input_shape as list of 3 element') + solver = self.sc._jvm.org.apache.sysml.api.dl.Utils.readCaffeSolver(solver) + self.estimator = self.sc._jvm.org.apache.sysml.api.dl.Caffe2DML(self.sc._jsc.sc(), solver, str(input_shape[0]), str(input_shape[1]), str(input_shape[2])) + self.weights = weights + if weights is not None: + self.estimator.setInput("$weights", str(weights)) + self._loadLabelTxt() + if ignore_weights is not None: + self.estimator.setWeightsToIgnore(ignore_weights) + self.transferUsingDF = transferUsingDF + self.setOutputRawPredictionsToFalse = False + self.visualize_called = False + if tensorboard_log_dir is not None: + self.estimator.setTensorBoardLogDir(tensorboard_log_dir) + + def _loadLabelTxt(self, format="binary", sep="/"): + if(self.weights is not None): + self.model = self.sc._jvm.org.apache.sysml.api.dl.Caffe2DMLModel(self.estimator) + df = self.sqlCtx.read.csv(self.weights + sep + 'labels.txt', header=False).toPandas() + keys = np.asarray(df._c0, dtype='int') + values = np.asarray(df._c1, dtype='str') + self.labelMap = {} + self.le = None + for i in range(len(keys)): + self.labelMap[int(keys[i])] = values[i] + # self.encode(classes) # Giving incorrect results + + def set(self, num_classes=None, debug=None): + """ + Set input to Caffe2DML + + Parameters + ---------- + debug: to add debugging DML code such as classification report, print DML script, etc (default: False) + """ + if debug is not None: self.estimator.setInput("$debug", str(debug).upper()) + return self + + def visualize(self, layerName=None, varType='weight', aggFn='mean'): + """ + Use this to visualize the training procedure (requires validation_percentage to be non-zero). + When one provides no argument to this method, we visualize training and validation loss. + + Parameters + ---------- + layerName: Name of the layer in the Caffe prototype + varType: should be either 'weight', 'bias', 'dweight', 'dbias', 'output' or 'doutput' + aggFn: should be either 'sum', 'mean', 'var' or 'sd' + """ + valid_vis_var_types = ['weight', 'bias', 'dweight', 'dbias', 'output', 'doutput'] + valid_vis_aggFn = [ 'sum', 'mean', 'var', 'sd' ] + if layerName is not None and varType is not None and aggFn is not None: + # Visualize other values + if varType not in valid_vis_var_types: + raise ValueError('The second argument should be either weight, bias, dweight, dbias, output or doutput') + if aggFn not in valid_vis_aggFn: + raise ValueError('The third argument should be either sum, mean, var, sd.') + if self.visualize_called: + self.estimator.visualizeLoss() + self.estimator.visualizeLayer(layerName, varType, aggFn) + else: + self.estimator.visualizeLoss() + self.visualize_called = True + return self + + def save(self, outputDir, format='binary', sep='/'): + """ + Save a trained model. + + Parameters + ---------- + outputDir: Directory to save the model to + format: optional format (default: 'binary') + sep: seperator to use (default: '/') + """ + if self.model != None: + self.model.save(outputDir, format, sep) + if self.le is not None: + labelMapping = dict(enumerate(list(self.le.classes_), 1)) + else: + labelMapping = self.labelMap + lStr = [ [ int(k), str(labelMapping[k]) ] for k in labelMapping ] + df = self.sqlCtx.createDataFrame(lStr) + df.write.csv(outputDir + sep + 'labels.txt', mode='overwrite', header=False) + else: + raise Exception('Cannot save as you need to train the model first using fit') + return self http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala new file mode 100644 index 0000000..7ab9160 --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysml.api.dl + +import caffe.Caffe.LayerParameter; +import caffe.Caffe.NetParameter; +import caffe.Caffe.SolverParameter; + +import org.apache.sysml.parser.LanguageException; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.api.ml.ScriptsUtils +import org.apache.sysml.runtime.matrix.MatrixCharacteristics +import org.apache.sysml.runtime.matrix.data.MatrixBlock +import scala.collection.JavaConversions._ +import java.util.ArrayList +import caffe.Caffe.Phase +import caffe.Caffe +import java.util.HashSet +import org.apache.sysml.api.DMLScript +import java.io.File +import org.apache.spark.SparkContext +import org.apache.spark.ml.{ Model, Estimator } +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.StructType +import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam } +import org.apache.sysml.runtime.matrix.MatrixCharacteristics +import org.apache.sysml.runtime.matrix.data.MatrixBlock +import org.apache.sysml.runtime.DMLRuntimeException +import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt => RDDConverterUtils } +import org.apache.sysml.api.mlcontext._ +import org.apache.sysml.api.mlcontext.ScriptFactory._ +import org.apache.sysml.api.ml._ +import java.util.Random +import org.apache.commons.logging.Log +import org.apache.commons.logging.LogFactory +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer + + +object Caffe2DML { + val LOG = LogFactory.getLog(classOf[Caffe2DML].getName()) + def fileSep():String = { if(File.separator.equals("\\")) "\\\\" else File.separator } + def setNNLibraryPath(path:String):Unit = { prefix = path + fileSep + "nn"} + // ------------------------------------------------------------------------ + var prefix = Utils.getPrefix() + def layerDir = prefix + fileSep + "layers" + fileSep + def optimDir = prefix + fileSep + "optim" + fileSep + + // Naming conventions: + val X = "X"; val y = "y"; val batchSize = "BATCH_SIZE"; val numImages = "num_images"; val numValidationImages = "num_validation" + val XVal = "X_val"; val yVal = "y_val" +} + +class Caffe2DML(val sc: SparkContext, val solverParam:Caffe.SolverParameter, + val solver:CaffeSolver, val net:CaffeNetwork, + val lrPolicy:LearningRatePolicy, val numChannels:String, val height:String, val width:String) extends Estimator[Caffe2DMLModel] + with BaseSystemMLClassifier with DMLGenerator { + // -------------------------------------------------------------- + // Invoked by Python, MLPipeline + def this(sc: SparkContext, solver1:Caffe.SolverParameter, networkPath:String, numChannels:String, height:String, width:String) { + this(sc, solver1, Utils.parseSolver(solver1), + new CaffeNetwork(networkPath, caffe.Caffe.Phase.TRAIN, numChannels, height, width), + new LearningRatePolicy(solver1), numChannels, height, width) + } + def this(sc: SparkContext, solver1:Caffe.SolverParameter, numChannels:String, height:String, width:String) { + this(sc, solver1, Utils.parseSolver(solver1), new CaffeNetwork(solver1.getNet, caffe.Caffe.Phase.TRAIN, numChannels, height, width), + new LearningRatePolicy(solver1), numChannels, height, width) + } + val uid:String = "caffe_classifier_" + (new Random).nextLong + override def copy(extra: org.apache.spark.ml.param.ParamMap): Estimator[Caffe2DMLModel] = { + val that = new Caffe2DML(sc, solverParam, solver, net, lrPolicy, numChannels, height, width) + copyValues(that, extra) + } + // Note: will update the y_mb as this will be called by Python mllearn + def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): Caffe2DMLModel = { + val ret = baseFit(X_mb, y_mb, sc) + new Caffe2DMLModel(ret, Utils.numClasses(net), sc, solver, net, lrPolicy, this) + } + def fit(df: ScriptsUtils.SparkDataType): Caffe2DMLModel = { + val ret = baseFit(df, sc) + new Caffe2DMLModel(ret, Utils.numClasses(net), sc, solver, net, lrPolicy, this) + } + // -------------------------------------------------------------- + + // Used for simplifying transfer learning + private val layersToIgnore:HashSet[String] = new HashSet[String]() + def setWeightsToIgnore(layerName:String):Unit = layersToIgnore.add(layerName) + def setWeightsToIgnore(layerNames:ArrayList[String]):Unit = layersToIgnore.addAll(layerNames) + + // Input parameters to prediction and scoring script + val inputs:java.util.HashMap[String, String] = new java.util.HashMap[String, String]() + def setInput(key: String, value:String):Unit = inputs.put(key, value) + customAssert(solverParam.getTestIterCount <= 1, "Multiple test_iter variables are not supported") + customAssert(solverParam.getMaxIter > 0, "Please set max_iter to a positive value") + customAssert(net.getLayers.filter(net.getCaffeLayer(_).isInstanceOf[IsLossLayer]).length == 1, "Expected exactly one loss layer") + + // TODO: throw error or warning if user tries to set solver_mode == GPU instead of using setGPU method + + // Method called by Python mllearn to visualize variable of certain layer + def visualizeLayer(layerName:String, varType:String, aggFn:String): Unit = visualizeLayer(net, layerName, varType, aggFn) + + // ------------------------------------------------------------------------------------------- + // Helper functions to generate DML + // Initializes Caffe2DML.X, Caffe2DML.y, Caffe2DML.XVal, Caffe2DML.yVal and Caffe2DML.numImages + private def trainTestSplit(numValidationBatches:Int):Unit = { + if(numValidationBatches > 0) { + if(solverParam.getDisplay <= 0) + throw new DMLRuntimeException("Since test_iter and test_interval is greater than zero, you should set display to be greater than zero") + tabDMLScript.append(Caffe2DML.numValidationImages).append(" = " + numValidationBatches + " * " + Caffe2DML.batchSize + "\n") + tabDMLScript.append("# Sanity check to ensure that validation set is not too large\n") + val maxValidationSize = "ceil(0.3 * " + Caffe2DML.numImages + ")" + ifBlock(Caffe2DML.numValidationImages + " > " + maxValidationSize) { + assign(tabDMLScript, "max_test_iter", "floor(" + maxValidationSize + " / " + Caffe2DML.batchSize + ")") + tabDMLScript.append("stop(" + + dmlConcat(asDMLString("Too large validation size. Please reduce test_iter to "), "max_test_iter") + + ")\n") + } + val one = "1" + val rl = int_add(Caffe2DML.numValidationImages, one) + rightIndexing(tabDMLScript.append(Caffe2DML.X).append(" = "), "X_full", rl, Caffe2DML.numImages, null, null) + tabDMLScript.append("; ") + rightIndexing(tabDMLScript.append(Caffe2DML.y).append(" = "), "y_full", rl, Caffe2DML.numImages, null, null) + tabDMLScript.append("; ") + rightIndexing(tabDMLScript.append(Caffe2DML.XVal).append(" = "), "X_full", one, Caffe2DML.numValidationImages, null, null) + tabDMLScript.append("; ") + rightIndexing(tabDMLScript.append(Caffe2DML.yVal).append(" = "), "y_full", one, Caffe2DML.numValidationImages, null, null) + tabDMLScript.append("; ") + tabDMLScript.append(Caffe2DML.numImages).append(" = nrow(y)\n") + } + else { + assign(tabDMLScript, Caffe2DML.X, "X_full") + assign(tabDMLScript, Caffe2DML.y, "y_full") + tabDMLScript.append(Caffe2DML.numImages).append(" = nrow(" + Caffe2DML.y + ")\n") + } + } + + private def printClassificationReport():Unit = { + ifBlock("debug"){ + assign(tabDMLScript, "num_rows_error_measures", min("10", ncol("yb"))) + assign(tabDMLScript, "error_measures", matrix("0", "num_rows_error_measures", "5")) + forBlock("class_i", "1", "num_rows_error_measures") { + assign(tabDMLScript, "tp", "sum( (true_yb == predicted_yb) * (true_yb == class_i) )") + assign(tabDMLScript, "tp_plus_fp", "sum( (predicted_yb == class_i) )") + assign(tabDMLScript, "tp_plus_fn", "sum( (true_yb == class_i) )") + assign(tabDMLScript, "precision", "tp / tp_plus_fp") + assign(tabDMLScript, "recall", "tp / tp_plus_fn") + assign(tabDMLScript, "f1Score", "2*precision*recall / (precision+recall)") + assign(tabDMLScript, "error_measures[class_i,1]", "class_i") + assign(tabDMLScript, "error_measures[class_i,2]", "precision") + assign(tabDMLScript, "error_measures[class_i,3]", "recall") + assign(tabDMLScript, "error_measures[class_i,4]", "f1Score") + assign(tabDMLScript, "error_measures[class_i,5]", "tp_plus_fn") + } + val dmlTab = "\\t" + val header = "class " + dmlTab + "precision" + dmlTab + "recall " + dmlTab + "f1-score" + dmlTab + "num_true_labels\\n" + val errorMeasures = "toString(error_measures, decimal=7, sep=" + asDMLString(dmlTab) + ")" + tabDMLScript.append(print(dmlConcat(asDMLString(header), errorMeasures))) + } + } + + // Append the DML to display training and validation loss + private def displayLoss(lossLayer:IsLossLayer, shouldValidate:Boolean):Unit = { + if(solverParam.getDisplay > 0) { + // Append the DML to compute training loss + tabDMLScript.append("# Compute training loss & accuracy\n") + ifBlock("iter %% " + solverParam.getDisplay + " == 0") { + assign(tabDMLScript, "loss", "0"); assign(tabDMLScript, "accuracy", "0") + lossLayer.computeLoss(dmlScript, numTabs) + assign(tabDMLScript, "training_loss", "loss"); assign(tabDMLScript, "training_accuracy", "accuracy") + tabDMLScript.append(print( dmlConcat( asDMLString("Iter:"), "iter", + asDMLString(", training loss:"), "training_loss", asDMLString(", training accuracy:"), "training_accuracy" ))) + appendTrainingVisualizationBody(dmlScript, numTabs) + printClassificationReport + } + if(shouldValidate) { + // Append the DML to compute validation loss + val numValidationBatches = if(solverParam.getTestIterCount > 0) solverParam.getTestIter(0) else 0 + tabDMLScript.append("# Compute validation loss & accuracy\n") + ifBlock("iter %% " + solverParam.getTestInterval + " == 0") { + assign(tabDMLScript, "loss", "0"); assign(tabDMLScript, "accuracy", "0") + solverParam.getTestAlgo.toLowerCase match { + case "minibatch" => { + assign(tabDMLScript, "validation_loss", "0") + assign(tabDMLScript, "validation_accuracy", "0") + forBlock("iVal", "1", "num_iters_per_epoch") { + getValidationBatch(tabDMLScript) + tabDMLScript.append("iter = start_iter + i\n") + forward; lossLayer.computeLoss(dmlScript, numTabs) + tabDMLScript.append("validation_loss = validation_loss + loss\n") + tabDMLScript.append("validation_accuracy = validation_accuracy + accuracy\n") + } + tabDMLScript.append("validation_accuracy = validation_accuracy / num_iters_per_epoch\n") + } + case "batch" => { + assign(tabDMLScript, "Xb", Caffe2DML.XVal); assign(tabDMLScript, "yb", Caffe2DML.yVal) + net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, false)) + lossLayer.computeLoss(dmlScript, numTabs) + assign(tabDMLScript, "validation_loss", "loss"); assign(tabDMLScript, "validation_accuracy", "accuracy") + + } + case _ => throw new DMLRuntimeException("Unsupported test algo:" + solverParam.getTestAlgo) + } + tabDMLScript.append(print( dmlConcat( asDMLString("Iter:"), "iter", + asDMLString(", validation loss:"), "validation_loss", asDMLString(", validation accuracy:"), "validation_accuracy" ))) + appendValidationVisualizationBody(dmlScript, numTabs) + } + } + } + } + + private def performSnapshot():Unit = { + if(solverParam.getSnapshot > 0) { + ifBlock("iter %% snapshot == 0") { + tabDMLScript.append("snapshot_dir= \"" + solverParam.getSnapshotPrefix + "\" + \"/iter_\" + iter + \"/\"\n") + net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != null).map(l => tabDMLScript.append(write(l.weight, "snapshot_dir + \"" + l.param.getName + "_weight.mtx\"", "binary"))) + net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(l => tabDMLScript.append(write(l.bias, "snapshot_dir + \"" + l.param.getName + "_bias.mtx\"", "binary"))) + } + } + } + + private def forward():Unit = { + tabDMLScript.append("# Perform forward pass\n") + net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, false)) + } + private def backward():Unit = backward("") + private def backward(suffix:String):Unit = { + tabDMLScript.append("# Perform backward pass\n") + net.getLayers.reverse.map(layer => net.getCaffeLayer(layer).backward(tabDMLScript, suffix)) + } + private def update():Unit = { + tabDMLScript.append("# Update the parameters\n") + net.getLayers.map(layer => solver.update(tabDMLScript, net.getCaffeLayer(layer))) + } + private def initAggGradients():Unit = { + tabDMLScript.append("# Data structure to store gradients computed in parallel") + net.getLayers.map(layer => net.getCaffeLayer(layer)).map(l => { + if(l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight + "_agg", matrix("0", "parallel_batches", multiply(nrow(l.weight), ncol(l.weight)))) + if(l.shouldUpdateBias) assign(tabDMLScript, l.dBias + "_agg", matrix("0", "parallel_batches", multiply(nrow(l.bias), ncol(l.bias)))) + }) + } + private def flattenAndStoreAggGradients_j():Unit = { + tabDMLScript.append("# Flatten and store gradients for this parallel execution\n") + net.getLayers.map(layer => net.getCaffeLayer(layer)).map(l => { + if(l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight + "_agg[j,]", + matrix(l.dWeight, "1", multiply(nrow(l.weight), ncol(l.weight)))) + if(l.shouldUpdateWeight) assign(tabDMLScript, l.dBias + "_agg[j,]", + matrix(l.dBias, "1", multiply(nrow(l.bias), ncol(l.bias)))) + }) + } + private def aggregateAggGradients():Unit = { + tabDMLScript.append("# Aggregate the gradients\n") + net.getLayers.map(layer => net.getCaffeLayer(layer)).map(l => { + if(l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight, + matrix(colSums(l.dWeight + "_agg"), nrow(l.weight), ncol(l.weight))) + if(l.shouldUpdateWeight) assign(tabDMLScript, l.dBias, + matrix(colSums(l.dBias + "_agg"), nrow(l.bias), ncol(l.bias))) + }) + } + // ------------------------------------------------------------------------------------------- + + private def multiply(v1:String, v2:String):String = v1 + "*" + v2 + private def colSums(m:String):String = "colSums(" + m + ")" + + // Script generator + def getTrainingScript(isSingleNode:Boolean):(Script, String, String) = { + val startTrainingTime = System.nanoTime() + val DEBUG_TRAINING = if(inputs.containsKey("$debug")) inputs.get("$debug").toLowerCase.toBoolean else false + reset() + + // Add source for layers as well as solver as well as visualization header + source(net, solver, Array[String]("l2_reg")) + appendVisualizationHeaders(dmlScript, numTabs) + + // Read and convert to one-hote encoding + assign(tabDMLScript, "X_full", "read(\" \", format=\"csv\")") + assign(tabDMLScript, "y_full", "read(\" \", format=\"csv\")") + tabDMLScript.append(Caffe2DML.numImages).append(" = nrow(y_full)\n") + tabDMLScript.append("weights = ifdef($weights, \" \")\n") + tabDMLScript.append("debug = ifdef($debug, FALSE)\n") + tabDMLScript.append("# Convert to one-hot encoding (Assumption: 1-based labels) \n") + tabDMLScript.append("y_full = table(seq(1," + Caffe2DML.numImages + ",1), y_full, " + Caffe2DML.numImages + ", " + Utils.numClasses(net) + ")\n") + + // Initialize the layers and solvers + tabDMLScript.append("# Initialize the layers and solvers\n") + net.getLayers.map(layer => net.getCaffeLayer(layer).init(tabDMLScript)) + if(inputs.containsKey("$weights")) { + // Loading existing weights. Note: keeping the initialization code in case the layer wants to initialize non-weights and non-bias + tabDMLScript.append("# Load the weights. Note: keeping the initialization code in case the layer wants to initialize non-weights and non-bias\n") + net.getLayers.filter(l => !layersToIgnore.contains(l)).map(net.getCaffeLayer(_)).filter(_.weight != null).map(l => tabDMLScript.append(read(l.weight, l.param.getName + "_weight.mtx"))) + net.getLayers.filter(l => !layersToIgnore.contains(l)).map(net.getCaffeLayer(_)).filter(_.bias != null).map(l => tabDMLScript.append(read(l.bias, l.param.getName + "_bias.mtx"))) + } + net.getLayers.map(layer => solver.init(tabDMLScript, net.getCaffeLayer(layer))) + + // Split into training and validation set + // Initializes Caffe2DML.X, Caffe2DML.y, Caffe2DML.XVal, Caffe2DML.yVal and Caffe2DML.numImages + val shouldValidate = solverParam.getTestInterval > 0 && solverParam.getTestIterCount > 0 && solverParam.getTestIter(0) > 0 + trainTestSplit(if(shouldValidate) solverParam.getTestIter(0) else 0) + + // Set iteration-related variables such as max_epochs, num_iters_per_epoch, lr, etc. + val lossLayers = net.getLayers.filter(layer => net.getCaffeLayer(layer).isInstanceOf[IsLossLayer]).map(layer => net.getCaffeLayer(layer).asInstanceOf[IsLossLayer]) + if(lossLayers.length != 1) throw new DMLRuntimeException("Expected exactly one loss layer") + solverParam.getTrainAlgo.toLowerCase match { + case "batch" => + assign(tabDMLScript, "max_epochs", solverParam.getMaxIter.toString) + case _ => { + ceilDivide(tabDMLScript, "num_iters_per_epoch", Caffe2DML.numImages, Caffe2DML.batchSize) + ceilDivide(tabDMLScript, "max_epochs", solverParam.getMaxIter.toString, "num_iters_per_epoch") + } + } + assign(tabDMLScript, "start_iter", "0") + assign(tabDMLScript, "lr", solverParam.getBaseLr.toString) + + // ---------------------------------------------------------------------------- + // Main logic + forBlock("e", "1", "max_epochs") { + solverParam.getTrainAlgo.toLowerCase match { + case "minibatch" => + forBlock("i", "1", "num_iters_per_epoch") { + getTrainingBatch(tabDMLScript) + tabDMLScript.append("iter = start_iter + i\n") + forward; backward; update + displayLoss(lossLayers(0), shouldValidate) + performSnapshot + } + case "batch" => { + tabDMLScript.append("iter = start_iter + i\n") + forward; backward; update + displayLoss(lossLayers(0), shouldValidate) + performSnapshot + } + case "allreduce" => { + forBlock("i", "1", "num_iters_per_epoch") { + getTrainingBatch(tabDMLScript) + assign(tabDMLScript, "X_group_batch", "Xb") + assign(tabDMLScript, "y_group_batch", "yb") + tabDMLScript.append("iter = start_iter + i\n") + initAggGradients + parForBlock("j", "1", "nrow(y_group_batch)") { + assign(tabDMLScript, "Xb", "X_group_batch[j,]") + assign(tabDMLScript, "yb", "y_group_batch[j,]") + forward; backward("_agg") + flattenAndStoreAggGradients_j + } + aggregateAggGradients + tabDMLScript.append("iter = start_iter + parallel_batches\n") + update + displayLoss(lossLayers(0), shouldValidate) + performSnapshot + } + } + case _ => throw new DMLRuntimeException("Unsupported train algo:" + solverParam.getTrainAlgo) + } + // After every epoch, update the learning rate + tabDMLScript.append("# Learning rate\n") + lrPolicy.updateLearningRate(tabDMLScript) + tabDMLScript.append("start_iter = start_iter + num_iters_per_epoch\n") + } + // ---------------------------------------------------------------------------- + + // Check if this is necessary + if(doVisualize) tabDMLScript.append("print(" + asDMLString("Visualization counter:") + " + viz_counter)") + + val trainingScript = tabDMLScript.toString() + // Print script generation time and the DML script on stdout + System.out.println("Time taken to generate training script from Caffe proto: " + ((System.nanoTime() - startTrainingTime)*1e-9) + " seconds." ) + if(DEBUG_TRAINING) Utils.prettyPrintDMLScript(trainingScript) + + // Set input/output variables and execute the script + val script = dml(trainingScript).in(inputs) + net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != null).map(l => script.out(l.weight)) + net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(l => script.out(l.bias)) + (script, "X_full", "y_full") + } +} + +class Caffe2DMLModel(val mloutput: MLResults, + val numClasses:String, val sc: SparkContext, val solver:CaffeSolver, + val net:CaffeNetwork, val lrPolicy:LearningRatePolicy, + val estimator:Caffe2DML) + extends Model[Caffe2DMLModel] with HasMaxOuterIter with BaseSystemMLClassifierModel with DMLGenerator { + // -------------------------------------------------------------- + // Invoked by Python, MLPipeline + val uid:String = "caffe_model_" + (new Random).nextLong + def this(estimator:Caffe2DML) = { + this(null, Utils.numClasses(estimator.net), estimator.sc, estimator.solver, + estimator.net, + // new CaffeNetwork(estimator.solverParam.getNet, caffe.Caffe.Phase.TEST, estimator.numChannels, estimator.height, estimator.width), + estimator.lrPolicy, estimator) + } + + override def copy(extra: org.apache.spark.ml.param.ParamMap): Caffe2DMLModel = { + val that = new Caffe2DMLModel(mloutput, numClasses, sc, solver, net, lrPolicy, estimator) + copyValues(that, extra) + } + // -------------------------------------------------------------- + + def save(outputDir:String, format:String="binary", sep:String="/"):Unit = { + if(mloutput == null) throw new DMLRuntimeException("Cannot save as you need to train the model first using fit") + val dmlScript = new StringBuilder + dmlScript.append("print(\"Saving the model to " + outputDir + "...\")\n") + net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != null).map(l => dmlScript.append(write(l.weight, outputDir + sep + l.param.getName + "_weight.mtx", format))) + net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(l => dmlScript.append(write(l.bias, outputDir + sep + l.param.getName + "_bias.mtx", format))) + + val script = dml(dmlScript.toString) + net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != null).map(l => script.in(l.weight, mloutput.getBinaryBlockMatrix(l.weight))) + net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(l => script.in(l.bias, mloutput.getBinaryBlockMatrix(l.bias))) + val ml = new MLContext(sc) + ml.execute(script) + } + + def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String) = { + reset() + val startPredictionTime = System.nanoTime() + val DEBUG_PREDICTION = if(estimator.inputs.containsKey("$debug")) estimator.inputs.get("$debug").toLowerCase.toBoolean else false + + // Append source statements for each layer + source(net, solver, null) + tabDMLScript.append("weights = ifdef($weights, \" \")\n") + // Initialize the layers and solvers + tabDMLScript.append("# Initialize the layers and solvers\n") + net.getLayers.map(layer => net.getCaffeLayer(layer).init(tabDMLScript)) + if(mloutput == null && estimator.inputs.containsKey("$weights")) { + // fit was not called + net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != null).map(l => tabDMLScript.append(read(l.weight, l.param.getName + "_weight.mtx"))) + net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(l => tabDMLScript.append(read(l.bias, l.param.getName + "_bias.mtx"))) + } + else if(mloutput == null) { + throw new DMLRuntimeException("Cannot call predict/score without calling either fit or by providing weights") + } + net.getLayers.map(layer => solver.init(tabDMLScript, net.getCaffeLayer(layer))) + +// if(estimator.inputs.containsKey("$debug") && estimator.inputs.get("$debug").equals("TRUE")) { +// System.out.println("The output shape of layers:") +// net.getLayers.map(layer => System.out.println(net.getCaffeLayer(layer).param.getName + " " + net.getCaffeLayer(layer).outputShape)) +// } + + // Donot update mean and variance in batchnorm + net.getLayers.filter(net.getCaffeLayer(_).isInstanceOf[BatchNorm]).map(net.getCaffeLayer(_).asInstanceOf[BatchNorm].update_mean_var = false) + tabDMLScript.append("X_full = read(\" \", format=\"csv\")\n") + assign(tabDMLScript, "X", "X_full") + tabDMLScript.append(Caffe2DML.numImages + " = nrow(X_full)\n") + + val lossLayers = net.getLayers.filter(layer => net.getCaffeLayer(layer).isInstanceOf[IsLossLayer]).map(layer => net.getCaffeLayer(layer).asInstanceOf[IsLossLayer]) + customAssert(lossLayers.length == 1, "Expected exactly one loss layer, but found " + lossLayers.length + ":" + net.getLayers.filter(layer => net.getCaffeLayer(layer).isInstanceOf[IsLossLayer])) + assign(tabDMLScript, "Prob", matrix("0", Caffe2DML.numImages, numClasses)) + estimator.solverParam.getTestAlgo.toLowerCase match { + case "minibatch" => { + ceilDivide(tabDMLScript(), "num_iters", Caffe2DML.numImages, Caffe2DML.batchSize) + forBlock("i", "1", "num_iters") { + getTestBatch(tabDMLScript) + net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, true)) + assign(tabDMLScript, "Prob[beg:end,]", lossLayers(0).out) + } + } + case "batch" => { + assign(tabDMLScript, "Xb", "X_full") + net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, true)) + assign(tabDMLScript, "Prob", lossLayers(0).out) + } + case "allreduce" => { + ceilDivide(tabDMLScript(), "num_iters", Caffe2DML.numImages, Caffe2DML.batchSize) + parForBlock("i", "1", "num_iters") { + getTestBatch(tabDMLScript) + net.getLayers.map(layer => net.getCaffeLayer(layer).forward(tabDMLScript, true)) + assign(tabDMLScript, "Prob[beg:end,]", lossLayers(0).out) + } + } + case _ => throw new DMLRuntimeException("Unsupported test algo:" + estimator.solverParam.getTestAlgo) + } + + val predictionScript = dmlScript.toString() + System.out.println("Time taken to generate prediction script from Caffe proto:" + ((System.nanoTime() - startPredictionTime)*1e-9) + "secs." ) + if(DEBUG_PREDICTION) Utils.prettyPrintDMLScript(predictionScript) + + // Reset + net.getLayers.filter(net.getCaffeLayer(_).isInstanceOf[BatchNorm]).map(net.getCaffeLayer(_).asInstanceOf[BatchNorm].update_mean_var = true) + + val script = dml(predictionScript).out("Prob").in(estimator.inputs) + if(mloutput != null) { + // fit was called + net.getLayers.map(net.getCaffeLayer(_)).filter(_.weight != null).map(l => script.in(l.weight, mloutput.getBinaryBlockMatrix(l.weight))) + net.getLayers.map(net.getCaffeLayer(_)).filter(_.bias != null).map(l => script.in(l.bias, mloutput.getBinaryBlockMatrix(l.bias))) + } + + (script, "X_full") + } + + // Prediction + def transform(X: MatrixBlock): MatrixBlock = { + baseTransform(X, mloutput, sc, "Prob") + } + def transform(df: ScriptsUtils.SparkDataType): DataFrame = { + baseTransform(df, mloutput, sc, "Prob") + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala new file mode 100644 index 0000000..4faa203 --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysml.api.dl + +import caffe.Caffe.LayerParameter +import scala.collection.JavaConversions._ +import org.apache.sysml.parser.LanguageException +import java.util.HashSet +import java.io.File +import org.apache.sysml.api.DMLScript +import org.apache.sysml.runtime.util.ConvolutionUtils +import caffe.Caffe.EltwiseParameter.EltwiseOp +import org.apache.sysml.runtime.DMLRuntimeException; +import java.util.ArrayList + +trait CaffeLayer extends BaseDMLGenerator { + // ------------------------------------------------- + // Any layer that wants to reuse SystemML-NN has to override following methods that help in generating the DML for the given layer: + def sourceFileName:String; + def init(dmlScript:StringBuilder):Unit; + def forward(dmlScript:StringBuilder, isPrediction:Boolean):Unit; + def backward(dmlScript:StringBuilder, outSuffix:String):Unit; + var computedOutputShape:(String, String, String) = null + def outputShape:(String, String, String) = { + if(computedOutputShape == null) computedOutputShape = bottomLayerOutputShape + computedOutputShape + } + // ------------------------------------------------- + var computedBottomLayerOutputShape:(String, String, String) = null + def bottomLayerOutputShape:(String, String, String) = { + if(computedBottomLayerOutputShape == null) { + val ret = net.getBottomLayers(param.getName).map(l => net.getCaffeLayer(l)).toList + if(ret.size == 0) throw new LanguageException("Expected atleast 1 bottom layer for " + param.getName) + computedBottomLayerOutputShape = ret(0).outputShape + } + computedBottomLayerOutputShape + } + def param:LayerParameter + def id:Int + def net:CaffeNetwork + // -------------------------------------------------------------------------------------- + // No need to override these methods in subclasses + // Exception: Only Data layer overrides "out" method to use 'Xb' for consistency + // Naming of the below methods is consistent with the nn library: + // X (feature map from the previous layer) ----> Forward pass ----> out (feature map to the next layer) + // dX (errors to the previous layer) <---- Backward pass <---- dout (errors from the next layer) + def out = "out" + id + var computedX:String = null + def X:String = { + if(computedX == null) { + val ret = net.getBottomLayers(param.getName).map(l => net.getCaffeLayer(l)).toList + if(ret.size == 0) throw new LanguageException("Expected atleast 1 bottom layer for " + param.getName) + else if(ret.size == 1) computedX = ret(0).out + else computedX = sum(new StringBuilder, ret.map(_.out).toList).toString() + } + computedX + } + var computedDout:String = null + def dout: String = { + if(computedDout == null) { + val ret = net.getTopLayers(param.getName).map(l => net.getCaffeLayer(l)).toList + if(ret.size == 0) throw new LanguageException("Expected atleast 1 top layer for " + param.getName) + else if(ret.size == 1) computedDout = ret(0).dX + else computedDout = sum(new StringBuilder, ret.map(_.dX).toList).toString() + } + computedDout + } + val dX = "dOut" + id + // -------------------------------------------------------------------------------------- + // No need to override these methods in subclasses, instead classes that have weights and biases + // should implement HasWeight and HasBias traits. + def dWeight():String = throw new DMLRuntimeException("dWeight is not implemented in super class") + def dBias():String = throw new DMLRuntimeException("dBias is not implemented in super class") + def weight():String = null; + def bias():String = null; + def shouldUpdateWeight():Boolean = if(weight != null) true else false + def shouldUpdateBias():Boolean = if(bias != null) true else false + // -------------------------------------------------------------------------------------- + // Helper methods to simplify the code of subclasses + def invokeInit(dmlScript:StringBuilder, returnVariables:List[String], arguments:String*):Unit = { + invoke(dmlScript, sourceFileName + "::", returnVariables, "init", arguments.toList) + } + def invokeForward(dmlScript:StringBuilder, returnVariables:List[String], arguments:String*):Unit = { + invoke(dmlScript, sourceFileName + "::", returnVariables, "forward", arguments.toList) + } + def invokeBackward(dmlScript:StringBuilder, outSuffix:String, resultVariables:List[String], arguments:String*):Unit = { + invoke(dmlScript, sourceFileName + "::", resultVariables.map(_ + outSuffix), "backward", arguments.toList) + } + // -------------------------------------------------------------------------------------- +} + + +trait IsLossLayer extends CaffeLayer { + def computeLoss(dmlScript:StringBuilder, numTabs:Int):Unit +} + +trait HasWeight extends CaffeLayer { + override def weight = "W" + id + override def dWeight = "dW" + id +} + +trait HasBias extends CaffeLayer { + override def bias = "b" + id + override def dBias = "db" + id +} + +class Data(val param:LayerParameter, val id:Int, val net:CaffeNetwork, val numChannels:String, val height:String, val width:String) extends CaffeLayer { + // ------------------------------------------------- + override def sourceFileName = null + override def init(dmlScript:StringBuilder) = { + if(param.hasTransformParam && param.getTransformParam.hasScale) { + dmlScript.append("X_full = X_full * " + param.getTransformParam.getScale + "\n") + } + dmlScript.append("BATCH_SIZE = " + param.getDataParam.getBatchSize + "\n") + } + var dataOutputShape = ("$num_channels", "$height", "$width") + override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = { } + override def out = "Xb" + override def backward(dmlScript:StringBuilder, outSuffix:String) = { } + override def outputShape = (numChannels, height, width) + // ------------------------------------------------- +} + + +// ------------------------------------------------------------------ +// weight is ema_mean and bias is ema_var +// Fuse +class BatchNorm(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer with HasWeight with HasBias { + // val scale = + override def sourceFileName = "batch_norm2d" + override def init(dmlScript:StringBuilder) = invokeInit(dmlScript, List[String](gamma, beta, ema_mean, ema_var), numChannels) + var update_mean_var = true + def forward(dmlScript: StringBuilder, isPrediction: Boolean): Unit = { + val mode = if(isPrediction) "\"test\"" else "\"train\"" + invokeForward(dmlScript, List[String](out, withSuffix(ema_mean), withSuffix(ema_var), withSuffix(cache_mean), withSuffix(cache_var), withSuffix(cache_norm)), + X, gamma, beta, numChannels, Hin, Win, mode, ema_mean, ema_var, ma_fraction, eps) + } + + def backward(dmlScript: StringBuilder, outSuffix:String): Unit = { + invokeBackward(dmlScript, outSuffix, List[String](dX, dgamma, dbeta), dout, out, ema_mean, ema_var, cache_mean, cache_var, cache_norm, X, gamma, beta, numChannels, + Hin, Win, "\"train\"", ema_mean, ema_var, ma_fraction, eps) + } + + private def withSuffix(str:String):String = if(update_mean_var) str else str + "_ignore" + override def weight = "ema_mean" + id + override def bias = "ema_var" + id + def cache_mean(): String = "cache_mean" + id + def cache_var():String = "cache_mean" + id + def cache_norm():String = "cache_norm" + id + var scaleLayer:Scale = null + def gamma():String = { checkNextLayer(); scaleLayer.weight } + def ma_fraction():String = if(param.getBatchNormParam.hasMovingAverageFraction()) param.getBatchNormParam.getMovingAverageFraction.toString else "0.999" + def eps():String = if(param.getBatchNormParam.hasEps()) param.getBatchNormParam.getEps.toString else "1e-5" + def beta():String = { checkNextLayer(); scaleLayer.bias } + def dgamma():String = { checkNextLayer(); scaleLayer.dWeight } + def dbeta():String = { checkNextLayer(); scaleLayer.dBias } + override def shouldUpdateWeight():Boolean = false + override def shouldUpdateBias():Boolean = false + def ema_mean(): String = weight + def ema_var(): String = bias + def checkNextLayer(): Unit = { + if(scaleLayer == null) { + val topLayers = net.getTopLayers(param.getName).map(l => net.getCaffeLayer(l)).toList + if(topLayers.length != 1 && !topLayers(0).isInstanceOf[Scale]) throw new LanguageException("Only one top layer of type Scale allowed for BatchNorm") + scaleLayer = topLayers(0).asInstanceOf[Scale] + } + } + def numChannels = bottomLayerOutputShape._1 + def Hin = bottomLayerOutputShape._2 + def Win = bottomLayerOutputShape._3 +} +// weight is gamma and bias is beta +class Scale(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer with HasWeight with HasBias { + if(!param.getScaleParam.getBiasTerm) throw new LanguageException("Add \"scale_param { bias_term: true }\" to the layer " + param.getName) + override def sourceFileName = null + override def init(dmlScript: StringBuilder): Unit = {} + def forward(dmlScript: StringBuilder, isPrediction: Boolean): Unit = assign(dmlScript, out, X) + override def backward(dmlScript: StringBuilder, outSuffix:String): Unit = assign(dmlScript, dX + outSuffix, dout) +} +// ------------------------------------------------------------------ + +class Elementwise(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer { + override def sourceFileName = null + override def init(dmlScript: StringBuilder): Unit = {} + if(param.getEltwiseParam.hasOperation && param.getEltwiseParam.getOperation != EltwiseOp.SUM) + throw new LanguageException("Currently only elementwise sum operation supported") + def forward(dmlScript: StringBuilder, isPrediction: Boolean): Unit = { + addAndAssign(dmlScript, out, param.getBottomList.map(b => net.getCaffeLayer(b).out).toList) + } + override def backward(dmlScript: StringBuilder, outSuffix:String): Unit = assign(dmlScript, dX + outSuffix, dout) + override def outputShape = { + if(_out == null) _out = net.getCaffeLayer(net.getBottomLayers(param.getName).take(1).toSeq.get(0)).outputShape + _out + } + var _out:(String, String, String) = null + +} + +class SoftmaxWithLoss(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer with IsLossLayer { + // ------------------------------------------------- + override def sourceFileName = "softmax" + override def init(dmlScript:StringBuilder) = {} + override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = + invokeForward(dmlScript, List[String](out), scores) + override def backward(dmlScript:StringBuilder, outSuffix:String) = { + invoke(dmlScript, "cross_entropy_loss::", List[String]("dProbs" + outSuffix), "backward", out, "yb") + invoke(dmlScript.append("\t"), "softmax::", List[String](dX + outSuffix), "backward", "dProbs", scores) + } + override def computeLoss(dmlScript:StringBuilder, numTabs:Int) = { + val tabBuilder = new StringBuilder + for(i <- 0 until numTabs) tabBuilder.append("\t") + val tabs = tabBuilder.toString + dmlScript.append("tmp_loss = cross_entropy_loss::forward(" + commaSep(out, "yb") + ")\n") + dmlScript.append(tabs).append("loss = loss + tmp_loss\n") + dmlScript.append(tabs).append("true_yb = rowIndexMax(yb)\n") + dmlScript.append(tabs).append("predicted_yb = rowIndexMax(" + out + ")\n") + dmlScript.append(tabs).append("accuracy = mean(predicted_yb == true_yb)*100\n") + } + def scores():String = { + val ret = net.getBottomLayers(param.getName).map(l => net.getCaffeLayer(l)).toList + if(ret.size == 1) return ret.get(0).out + else if(ret.size == 2) { + val ret1 = if(!ret.get(0).out.equals("Xb")) ret.get(0).out else ""; + val ret2 = if(!ret.get(1).out.equals("Xb")) ret.get(1).out else ""; + if(!ret1.equals("") && !ret2.equals("")) throw new LanguageException("Atleast one of the output of previous layer should be Xb") + else if(!ret1.equals("")) return ret1 + else return ret2 + } + else + throw new LanguageException("More than 2 bottom layers is not supported") + } + // ------------------------------------------------- +} + +class ReLU(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer { + // ------------------------------------------------- + override def sourceFileName = "relu" + override def init(dmlScript:StringBuilder) = { } + override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = invokeForward(dmlScript, List[String](out), X) + override def backward(dmlScript:StringBuilder, outSuffix:String) = invokeBackward(dmlScript, outSuffix, List[String](dX), dout, X) + // ------------------------------------------------- +} + +class Dropout(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer { + // ------------------------------------------------- + override def sourceFileName = "dropout" + override def init(dmlScript:StringBuilder) = { } + override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = + if(!isPrediction) + invokeForward(dmlScript, List[String](out, mask), X, p, seed) + else + assign(dmlScript, out, X) // Forward-pass not required to be performed during prediction for Dropout layer + override def backward(dmlScript:StringBuilder, outSuffix:String) = invokeBackward(dmlScript, outSuffix, List[String](dX), dout, X, p, mask) + // ------------------------------------------------- + def mask = "mask" + id + def p = param.getDropoutParam.getDropoutRatio.toString + def seed = "-1" +} + +class InnerProduct(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer with HasWeight with HasBias { + // ------------------------------------------------- + override def sourceFileName = "affine" + override def init(dmlScript:StringBuilder) = invokeInit(dmlScript, List[String](weight, bias), numFeatures, numNeurons) + override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = + invokeForward(dmlScript, List[String](out), X, weight, bias) + override def backward(dmlScript:StringBuilder, outSuffix:String) = + invokeBackward(dmlScript, outSuffix, List[String](dX, dWeight, dBias), dout, X, weight, bias) + // ------------------------------------------------- + def numNeurons = param.getInnerProductParam.getNumOutput.toString + def numFeatures = int_mult(bottomLayerOutputShape._1, bottomLayerOutputShape._2, bottomLayerOutputShape._3) + override def outputShape = ( param.getInnerProductParam.getNumOutput.toString, "1", "1" ) +} + +class MaxPooling(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer { + // ------------------------------------------------- + override def sourceFileName = "max_pool2d_builtin" + override def init(dmlScript:StringBuilder) = {} + override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = + invokeForward(dmlScript, List[String](out, "ignoreHout_"+id, "ignoreWout_"+id), + X, numChannels, Hin, Win, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) + override def backward(dmlScript:StringBuilder, outSuffix:String) = + invokeBackward(dmlScript, outSuffix, List[String](dX), dout, Hout, Wout, X, numChannels, Hin, Win, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) + override def outputShape = ( numChannels, Hout, Wout ) + // ------------------------------------------------- + def Hin = bottomLayerOutputShape._2 + def Win = bottomLayerOutputShape._3 + def Hout = ConvolutionUtils.getConv2dOutputMap(bottomLayerOutputShape._2, kernel_h, stride_h, pad_h) + def Wout = ConvolutionUtils.getConv2dOutputMap(bottomLayerOutputShape._3, kernel_w, stride_w, pad_w) + def poolingParam = param.getPoolingParam + def numChannels = bottomLayerOutputShape._1 + def kernel_h = if(poolingParam.hasKernelH) poolingParam.getKernelH.toString + else poolingParam.getKernelSize.toString + def kernel_w = if(poolingParam.hasKernelW) poolingParam.getKernelW.toString + else poolingParam.getKernelSize.toString + def stride_h = if(poolingParam.hasStrideH) poolingParam.getStrideH.toString + else poolingParam.getStride.toString + def stride_w = if(poolingParam.hasStrideW) poolingParam.getStrideW.toString + else poolingParam.getStride.toString + def pad_h = if(poolingParam.hasPadH) poolingParam.getPadH.toString + else poolingParam.getPad.toString + def pad_w = if(poolingParam.hasPadW) poolingParam.getPadW.toString + else poolingParam.getPad.toString +} + +class Convolution(val param:LayerParameter, val id:Int, val net:CaffeNetwork) extends CaffeLayer with HasWeight with HasBias { + // ------------------------------------------------- + override def sourceFileName = "conv2d_builtin"; + override def init(dmlScript:StringBuilder) = invokeInit(dmlScript, List[String](weight, bias), numKernels, numChannels, kernel_h, kernel_w) + override def forward(dmlScript:StringBuilder, isPrediction:Boolean) = + invokeForward(dmlScript, List[String](out, "ignoreHout_"+id, "ignoreWout_"+id), + X, weight, bias, numChannels, Hin, Win, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) + override def backward(dmlScript:StringBuilder, outSuffix:String) = + invokeBackward(dmlScript, outSuffix, List[String](dX, dWeight, dBias), dout, Hout, Wout, X, weight, bias, numChannels, Hin, Win, kernel_h, kernel_w, stride_h, stride_w, pad_h, pad_w) + override def outputShape = ( numKernels, Hout, Wout ) + // ------------------------------------------------- + def numChannels = bottomLayerOutputShape._1 + def Hin = bottomLayerOutputShape._2 + def Win = bottomLayerOutputShape._3 + def Hout = ConvolutionUtils.getConv2dOutputMap(bottomLayerOutputShape._2, kernel_h, stride_h, pad_h) + def Wout = ConvolutionUtils.getConv2dOutputMap(bottomLayerOutputShape._3, kernel_w, stride_w, pad_w) + def convParam = param.getConvolutionParam + def numKernels = convParam.getNumOutput.toString + def kernel_h = if(convParam.hasKernelH) convParam.getKernelH.toString + else if(convParam.getKernelSizeCount > 0) convParam.getKernelSize(0).toString + else throw new LanguageException("Incorrect kernel parameters") + def kernel_w = if(convParam.hasKernelW) convParam.getKernelW.toString + else if(convParam.getKernelSizeCount > 0) convParam.getKernelSize(0).toString + else throw new LanguageException("Incorrect kernel parameters") + def stride_h = if(convParam.hasStrideH) convParam.getStrideH.toString + else if(convParam.getStrideCount > 0) convParam.getStride(0).toString + else throw new LanguageException("Incorrect stride parameters:" + convParam.getStrideH + " " + convParam.getStrideList + " " + param.getName) + def stride_w = if(convParam.hasStrideW) convParam.getStrideW.toString + else if(convParam.getStrideCount > 0) convParam.getStride(0).toString + else throw new LanguageException("Incorrect stride parameters") + def pad_h = if(convParam.hasPadH) convParam.getPadH.toString + else if(convParam.getPadCount > 0) convParam.getPad(0).toString + else throw new LanguageException("Incorrect pad parameters") + def pad_w = if(convParam.hasPadW) convParam.getPadW.toString + else if(convParam.getPadCount > 0) convParam.getPad(0).toString + else throw new LanguageException("Incorrect pad parameters") +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cc7993fc/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala b/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala new file mode 100644 index 0000000..e585e30 --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysml.api.dl + +import org.apache.sysml.runtime.DMLRuntimeException +import scala.collection.JavaConversions._ +import caffe.Caffe.NetParameter +import caffe.Caffe.LayerParameter +import caffe.Caffe.Phase +import java.util.ArrayList +import java.util.HashSet +import scala.collection.mutable.Stack +import org.apache.sysml.parser.LanguageException; +import java.util.HashMap +import caffe.Caffe.PoolingParameter +import org.apache.commons.logging.LogFactory + +trait Network { + def getLayers(): List[String] + def getCaffeLayer(layerName:String):CaffeLayer + def getBottomLayers(layerName:String): Set[String] + def getTopLayers(layerName:String): Set[String] + def getLayerID(layerName:String): Int +} + +object CaffeNetwork { + val LOG = LogFactory.getLog(classOf[CaffeNetwork].getName) +} + +class CaffeNetwork(netFilePath:String, val currentPhase:Phase, + val numChannels:String, val height:String, val width:String + ) extends Network { + private def isIncludedInCurrentPhase(l:LayerParameter): Boolean = { + if(l.getIncludeCount == 0) true else l.getIncludeList.filter(r => r.hasPhase() && r.getPhase != currentPhase).length == 0 + } + private var id = 1 + + // -------------------------------------------------------------------------------- + private var _caffeLayerParams:List[LayerParameter] = Utils.readCaffeNet(netFilePath).getLayerList.filter(l => isIncludedInCurrentPhase(l)).toList + // -------------------------------------------------------------------------------- + + private var _layerNames: List[String] = _caffeLayerParams.map(l => l.getName).toList + CaffeNetwork.LOG.debug("Layers in current phase:" + _layerNames) + + // Condition 1: assert that each name is unique + private val _duplicateLayerNames =_layerNames.diff(_layerNames.distinct) + if(_duplicateLayerNames.size != 0) throw new LanguageException("Duplicate layer names is not supported:" + _duplicateLayerNames) + + // Condition 2: only 1 top name, except Data layer + private val _condition2Exceptions = Set("data") + _caffeLayerParams.filter(l => !_condition2Exceptions.contains(l.getType.toLowerCase)).map(l => if(l.getTopCount != 1) throw new LanguageException("Multiple top layers is not supported for " + l.getName)) + + // Condition 3: Replace top layer names referring to a Data layer with its name + // Example: layer{ name: mnist, top: data, top: label, ... } + private val _topToNameMappingForDataLayer = new HashMap[String, String]() + private def containsOnly(list:java.util.List[String], v:String): Boolean = list.toSet.diff(Set(v)).size() == 0 + private def isData(l:LayerParameter):Boolean = l.getType.equalsIgnoreCase("data") + private def replaceTopWithNameOfDataLayer(l:LayerParameter):LayerParameter = { + if(containsOnly(l.getTopList,l.getName)) + return l + else { + val builder = l.toBuilder(); + for(i <- 0 until l.getTopCount) { + if(! l.getTop(i).equals(l.getName)) { _topToNameMappingForDataLayer.put(l.getTop(i), l.getName) } + builder.setTop(i, l.getName) + } + return builder.build() + } + } + // 3a: Replace top of DataLayer with its names + // Example: layer{ name: mnist, top: mnist, top: mnist, ... } + _caffeLayerParams = _caffeLayerParams.map(l => if(isData(l)) replaceTopWithNameOfDataLayer(l) else l) + private def replaceBottomOfNonDataLayers(l:LayerParameter):LayerParameter = { + val builder = l.toBuilder(); + // Note: Top will never be Data layer + for(i <- 0 until l.getBottomCount) { + if(_topToNameMappingForDataLayer.containsKey(l.getBottom(i))) + builder.setBottom(i, _topToNameMappingForDataLayer.get(l.getBottom(i))) + } + return builder.build() + } + // 3a: If top/bottom of other layers refer DataLayer, then replace them + // layer { name: "conv1_1", type: "Convolution", bottom: "data" + _caffeLayerParams = if(_topToNameMappingForDataLayer.size == 0) _caffeLayerParams else _caffeLayerParams.map(l => if(isData(l)) l else replaceBottomOfNonDataLayers(l)) + + // Condition 4: Deal with fused layer + // Example: layer { name: conv1, top: conv1, ... } layer { name: foo, bottom: conv1, top: conv1 } + private def isFusedLayer(l:LayerParameter): Boolean = l.getTopCount == 1 && l.getBottomCount == 1 && l.getTop(0).equalsIgnoreCase(l.getBottom(0)) + private def containsReferencesToFusedLayer(l:LayerParameter):Boolean = l.getBottomList.foldLeft(false)((prev, bLayer) => prev || _fusedTopLayer.containsKey(bLayer)) + private val _fusedTopLayer = new HashMap[String, String]() + _caffeLayerParams = _caffeLayerParams.map(l => { + if(isFusedLayer(l)) { + val builder = l.toBuilder(); + if(_fusedTopLayer.containsKey(l.getBottom(0))) { + builder.setBottom(0, _fusedTopLayer.get(l.getBottom(0))) + } + builder.setTop(0, l.getName) + _fusedTopLayer.put(l.getBottom(0), l.getName) + builder.build() + } + else if(containsReferencesToFusedLayer(l)) { + val builder = l.toBuilder(); + for(i <- 0 until l.getBottomCount) { + if(_fusedTopLayer.containsKey(l.getBottomList.get(i))) { + builder.setBottom(i, _fusedTopLayer.get(l.getBottomList.get(i))) + } + } + builder.build() + } + else l + }) + + // -------------------------------------------------------------------------------- + + // Helper functions to extract bottom and top layers + private def convertTupleListToMap(m:List[(String, String)]):Map[String, Set[String]] = m.groupBy(_._1).map(x => (x._1, x._2.map(y => y._2).toSet)).toMap + private def flipKeyValues(t:List[(String, Set[String])]): List[(String, String)] = t.flatMap(x => x._2.map(b => b -> x._1)) + private def expandBottomList(layerName:String, bottomList:java.util.List[String]): List[(String, String)] = bottomList.filter(b => !b.equals(layerName)).map(b => layerName -> b).toList + + // The bottom layers are the layers available in the getBottomList (from Caffe .proto files) + private val _bottomLayers:Map[String, Set[String]] = convertTupleListToMap( + _caffeLayerParams.flatMap(l => expandBottomList(l.getName, l.getBottomList))) + CaffeNetwork.LOG.info("Bottom layers:" + _bottomLayers) + + // Find the top layers by reversing the bottom list + private val _topLayers:Map[String, Set[String]] = convertTupleListToMap(flipKeyValues(_bottomLayers.toList)) + CaffeNetwork.LOG.info("Top layers:" + _topLayers) + + private val _layers: Map[String, CaffeLayer] = _caffeLayerParams.map(l => l.getName -> convertLayerParameterToCaffeLayer(l)).toMap + CaffeNetwork.LOG.info("Layers:" + _layers) + private val _layerIDs: Map[String, Int] = _layers.entrySet().map(x => x.getKey -> x.getValue.id).toMap + + + private def throwException(layerName:String) = throw new LanguageException("Layer with name " + layerName + " not found") + def getLayers(): List[String] = _layerNames + def getCaffeLayer(layerName:String):CaffeLayer = if(checkKey(_layers, layerName)) _layers.get(layerName).get else throwException(layerName) + def getBottomLayers(layerName:String): Set[String] = if(checkKey(_bottomLayers, layerName)) _bottomLayers.get(layerName).get else throwException(layerName) + def getTopLayers(layerName:String): Set[String] = if(checkKey(_topLayers, layerName)) _topLayers.get(layerName).get else throwException(layerName) + def getLayerID(layerName:String): Int = if(checkKey(_layerIDs, layerName)) _layerIDs.get(layerName).get else throwException(layerName) + + // Helper functions + private def checkKey(m:Map[String, Any], key:String): Boolean = { + if(m == null) throw new LanguageException("Map is null") + else if(key == null) throw new LanguageException("key is null") + else m.containsKey(key) + } + private def convertLayerParameterToCaffeLayer(param:LayerParameter):CaffeLayer = { + id = id + 1 + param.getType.toLowerCase() match { + case "convolution" => new Convolution(param, id, this) + case "pooling" => if(param.getPoolingParam.getPool == PoolingParameter.PoolMethod.MAX) new MaxPooling(param, id, this) + else throw new LanguageException("Only maxpooling is supported:" + param.getPoolingParam.getPool.name) + case "innerproduct" => new InnerProduct(param, id, this) + case "relu" => new ReLU(param, id, this) + case "softmaxwithloss" => new SoftmaxWithLoss(param, id, this) + case "dropout" => new Dropout(param, id, this) + case "data" => new Data(param, id, this, numChannels, height, width) + case "batchnorm" => new BatchNorm(param, id, this) + case "scale" => new Scale(param, id, this) + case "eltwise" => new Elementwise(param, id, this) + case _ => throw new LanguageException("Layer of type " + param.getType + " is not supported") + } + } +} \ No newline at end of file
