Repository: systemml Updated Branches: refs/heads/master c3d565b88 -> 4b615bc08
[SYSTEMML-540] Remove caffe dependency from Keras2DML - Also, added documentation to explain the three deep learning APIs. Closes #702. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4b615bc0 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4b615bc0 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4b615bc0 Branch: refs/heads/master Commit: 4b615bc087b7951af50cdcd1bcb5849e9a146051 Parents: c3d565b Author: Niketan Pansare <[email protected]> Authored: Mon Nov 20 16:05:01 2017 -0800 Committer: Niketan Pansare <[email protected]> Committed: Mon Nov 20 16:06:15 2017 -0800 ---------------------------------------------------------------------- docs/deep-learning.md | 195 ++++++ docs/index.md | 5 +- src/main/python/systemml/mllearn/estimators.py | 37 +- src/main/python/systemml/mllearn/keras2caffe.py | 681 ++++++------------- .../org/apache/sysml/api/dl/CaffeNetwork.scala | 20 + 5 files changed, 448 insertions(+), 490 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/4b615bc0/docs/deep-learning.md ---------------------------------------------------------------------- diff --git a/docs/deep-learning.md b/docs/deep-learning.md new file mode 100644 index 0000000..23084a5 --- /dev/null +++ b/docs/deep-learning.md @@ -0,0 +1,195 @@ +--- +layout: global +title: Deep Learning with SystemML +description: Deep Learning with SystemML +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +* This will become a table of contents (this text will be scraped). +{:toc} + +<br/> + +There are three different ways to implement a Deep Learning model in SystemML: +1. Using the [DML-bodied NN library](https://github.com/apache/systemml/tree/master/scripts/nn): This library allows the user to exploit full flexibility of [DML language](http://apache.github.io/systemml/dml-language-reference) to implement your neural network. +2. Using the experimental [Caffe2DML API](http://apache.github.io/systemml/beginners-guide-caffe2dml.html): This API allows a model expressed in Caffe's proto format to be imported into SystemML. This API **doesnot** require Caffe to be installed on your SystemML. +3. Using the experimental [Keras2DML API](http://apache.github.io/systemml/beginners-guide-keras2dml.html): This API allows a model expressed in Keras to be imported into SystemML. However, this API requires Keras to be installed on your driver. + + +# Training Lenet on the MNIST dataset + +Download the MNIST dataset using [mlxtend package](https://pypi.python.org/pypi/mlxtend). + +```python +from mlxtend.data import mnist_data +import numpy as np +from sklearn.utils import shuffle +# Download the MNIST dataset +X, y = mnist_data() +X, y = shuffle(X, y) +# Split the data into training and test +n_samples = len(X) +X_train = X[:int(.9 * n_samples)] +y_train = y[:int(.9 * n_samples)] +X_test = X[int(.9 * n_samples):] +y_test = y[int(.9 * n_samples):] +``` + +<div class="codetabs"> + +<div data-lang="NN library" markdown="1"> +{% highlight python %} +from systemml import MLContext, dml + +ml = MLContext(sc) +ml.setStatistics(True) +# ml.setConfigProperty("sysml.native.blas", "auto") +# ml.setGPU(True).setForceGPU(True) +script = """ + source("nn/examples/mnist_lenet.dml") as mnist_lenet + + # Scale images to [-1,1], and one-hot encode the labels + images = (images / 255) * 2 - 1 + n = nrow(images) + labels = table(seq(1, n), labels+1, n, 10) + + # Split into training (4000 examples) and validation (4000 examples) + X = images[501:nrow(images),] + X_val = images[1:500,] + y = labels[501:nrow(images),] + y_val = labels[1:500,] + + # Train the model to produce weights & biases. + [W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, y, X_val, y_val, C, Hin, Win, epochs) +""" +out = ('W1', 'b1', 'W2', 'b2', 'W3', 'b3', 'W4', 'b4') +prog = (dml(script).input(images=X_train, labels=y_train.reshape((-1, 1)), epochs=1, C=1, Hin=28, Win=28) + .output(*out)) + +W1, b1, W2, b2, W3, b3, W4, b4 = ml.execute(prog).get(*out) + +script_predict = """ + source("nn/examples/mnist_lenet.dml") as mnist_lenet + + # Scale images to [-1,1] + X_test = (X_test / 255) * 2 - 1 + + # Predict + y_prob = mnist_lenet::predict(X_test, C, Hin, Win, W1, b1, W2, b2, W3, b3, W4, b4) + y_pred = rowIndexMax(y_prob) - 1 +""" +prog = (dml(script_predict).input(X_test=X_test, C=1, Hin=28, Win=28, W1=W1, b1=b1, + W2=W2, b2=b2, W3=W3, b3=b3, W4=W4, b4=b4) + .output("y_pred")) + +y_pred = ml.execute(prog).get("y_pred").toNumPy() +{% endhighlight %} +</div> + +<div data-lang="Caffe2DML" markdown="1"> +{% highlight python %} +from systemml.mllearn import Caffe2DML +import urllib + +# Download the Lenet network +urllib.urlretrieve('https://raw.githubusercontent.com/apache/systemml/master/scripts/nn/examples/caffe2dml/models/mnist_lenet/lenet.proto', 'lenet.proto') +urllib.urlretrieve('https://raw.githubusercontent.com/apache/systemml/master/scripts/nn/examples/caffe2dml/models/mnist_lenet/lenet_solver.proto', 'lenet_solver.proto') +# Train Lenet On MNIST using scikit-learn like API + +# MNIST dataset contains 28 X 28 gray-scale (number of channel=1). +lenet = Caffe2DML(spark, solver='lenet_solver.proto', input_shape=(1, 28, 28)) +lenet.setStatistics(True) +# lenet.setConfigProperty("sysml.native.blas", "auto") +# lenet.setGPU(True).setForceGPU(True) + +# Since Caffe2DML is a mllearn API, it allows for scikit-learn like method for training. +lenet.fit(X_train, y_train) +# Either perform prediction: lenet.predict(X_test) or scoring: +lenet.score(X_test, y_test) +{% endhighlight %} +</div> + +<div data-lang="Keras2DML" markdown="1"> +{% highlight python %} +from keras.layers import Input, Dense, Conv2D, MaxPooling2D, Dropout,Flatten +from keras import backend as K +from keras.models import Model +input_shape = (1,28,28) if K.image_data_format() == 'channels_first' else (28,28, 1) +input_img = Input(shape=(input_shape)) +x = Conv2D(32, kernel_size=(5, 5), activation='relu', input_shape=input_shape, padding='same')(input_img) +x = MaxPooling2D(pool_size=(2, 2))(x) +x = Conv2D(64, (5, 5), activation='relu', padding='same')(x) +x = MaxPooling2D(pool_size=(2, 2))(x) +x = Flatten()(x) +x = Dense(512, activation='relu')(x) +x = Dropout(0.5)(x) +x = Dense(10, activation='softmax')(x) +keras_model = Model(input_img, x) +keras_model.summary() + +from systemml.mllearn import Keras2DML +sysml_model = Keras2DML(spark, keras_model, input_shape=(1,28,28), weights='weights_dir') +# sysml_model.setConfigProperty("sysml.native.blas", "auto") +# sysml_model.setGPU(True).setForceGPU(True) +sysml_model.summary() +sysml_model.fit(X_train, y_train) +sysml_model.score(X_test, y_test) +{% endhighlight %} +</div> + +</div> + +# Prediction using a pretrained ResNet-50 + +<div class="codetabs"> + +<div data-lang="NN library" markdown="1"> +{% highlight python %} +Will be added soon ... +{% endhighlight %} +</div> + +<div data-lang="Caffe2DML" markdown="1"> +{% highlight python %} +Will be added soon ... +{% endhighlight %} +</div> + +<div data-lang="Keras2DML" markdown="1"> +{% highlight python %} +from systemml.mllearn import Keras2DML +import systemml as sml +import keras, urllib +from PIL import Image +from keras.applications.resnet50 import preprocess_input, decode_predictions, ResNet50 + +model = ResNet50(weights='imagenet',include_top=True,pooling='None',input_shape=(224,224,3)) +model.compile(optimizer='sgd', loss= 'categorical_crossentropy') + +resnet = Keras2DML(spark,model,input_shape=(3,224,224), weights='tmp', labels='https://raw.githubusercontent.com/apache/systemml/master/scripts/nn/examples/caffe2dml/models/imagenet/labels.txt') +resnet.summary() +urllib.urlretrieve('https://upload.wikimedia.org/wikipedia/commons/f/f4/Cougar_sitting.jpg', 'test.jpg') +img_shape = (3, 224, 224) +input_image = sml.convertImageToNumPyArr(Image.open('test.jpg'), img_shape=img_shape) +resnet.predict(input_image) +{% endhighlight %} +</div> + +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/systemml/blob/4b615bc0/docs/index.md ---------------------------------------------------------------------- diff --git a/docs/index.md b/docs/index.md index 1178009..fdb0d8b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -50,8 +50,9 @@ for running SystemML from Spark via Scala, Python, or Java. * [Standalone](standalone-guide) - Standalone mode allows data scientists to rapidly prototype algorithms on a single machine in R-like and Python-like declarative languages. * [JMLC](jmlc) - Java Machine Learning Connector. -* *Experimental* Caffe2DML API for Deep Learning ([beginner's guide](beginners-guide-caffe2dml), [reference guide](reference-guide-caffe2dml)) - Converts a Caffe specification to DML. -* *Experimental* [Keras2DML API](beginners-guide-keras2dml) for Deep Learning. +* [Deep Learning with SystemML](deep-learning) + * *Experimental* Caffe2DML API for Deep Learning ([beginner's guide](beginners-guide-caffe2dml), [reference guide](reference-guide-caffe2dml)) - Converts a Caffe specification to DML. + * *Experimental* [Keras2DML API](beginners-guide-keras2dml) for Deep Learning. ## Language Guides http://git-wip-us.apache.org/repos/asf/systemml/blob/4b615bc0/src/main/python/systemml/mllearn/estimators.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py index 7e6104f..3175c9c 100644 --- a/src/main/python/systemml/mllearn/estimators.py +++ b/src/main/python/systemml/mllearn/estimators.py @@ -884,28 +884,35 @@ class Keras2DML(Caffe2DML): """ - def __init__(self, sparkSession, keras_model, input_shape, transferUsingDF=False, - tensorboard_log_dir=None): + def __init__(self, sparkSession, keras_model, input_shape, transferUsingDF=False, weights=None, labels=None): """ Performs training/prediction for a given keras model. Parameters ---------- - parkSession: PySpark SparkSession + sparkSession: PySpark SparkSession model: keras hdf5 model file path input_shape: 3-element list (number of channels, input height, input width) transferUsingDF: whether to pass the input dataset via PySpark DataFrame (default: False) - tensorboard_log_dir: directory to store the event logs (default: None, - we use a temporary directory) + weights: directory whether learned weights are stored (default: None) """ - #NOTE Lazily imported until the Caffe Dependency issue is resolved - from . import keras2caffe + from .keras2caffe import * + import tempfile self.name = keras_model.name - #Convert keras model into caffe net and weights - caffenet, caffemodel = keras2caffe.generate_caffe_model(keras_model,self.name + ".proto",self.name + ".caffemodel") - #Create solver from network file - caffesolver = keras2caffe.CaffeSolver(self.name + ".proto",keras_model).write(self.name + "_solver.proto") - #Generate caffe2DML object - super(Keras2DML,self).__init__(sparkSession, self.name+ "_solver.proto",input_shape, transferUsingDF, tensorboard_log_dir) - #Create and Load weights into caffe2DML - convert_caffemodel(sparkSession.sparkContext,self.name + ".proto", self.name + ".caffemodel", self.name + "_C2DML_weights") + createJavaObject(sparkSession._sc, 'dummy') + convertKerasToCaffeNetwork(keras_model, self.name + ".proto") + convertKerasToCaffeSolver(keras_model, self.name + ".proto", self.name + "_solver.proto") + self.weights = tempfile.mkdtemp() if weights is None else weights + convertKerasToSystemMLModel(sparkSession, keras_model, self.weights) + if labels is not None and (labels.startswith('https:') or labels.startswith('http:')): + import urllib + urllib.urlretrieve(labels, os.path.join(weights, 'labels.txt')) + elif labels is not None: + from shutil import copyfile + copyfile(labels, os.path.join(weights, 'labels.txt')) + super(Keras2DML,self).__init__(sparkSession, self.name + "_solver.proto", input_shape, transferUsingDF) + self.load(self.weights) + + def close(self): + import shutil + shutil.rmtree(weights) http://git-wip-us.apache.org/repos/asf/systemml/blob/4b615bc0/src/main/python/systemml/mllearn/keras2caffe.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/mllearn/keras2caffe.py b/src/main/python/systemml/mllearn/keras2caffe.py index 43b1fe1..deefff2 100755 --- a/src/main/python/systemml/mllearn/keras2caffe.py +++ b/src/main/python/systemml/mllearn/keras2caffe.py @@ -21,404 +21,61 @@ # Script to generate caffe proto and .caffemodel files from Keras models -from caffe import * -import caffe -from caffe import layers as L -from caffe import params as P - -import keras -from keras.models import load_model -from keras.models import model_from_json -from keras.utils.conv_utils import convert_kernel import numpy as np +import os +from itertools import chain, imap +from ..converters import * +from ..classloader import * +import keras +try: + import py4j.java_gateway + from py4j.java_gateway import JavaObject + from pyspark import SparkContext +except ImportError: + raise ImportError('Unable to import `pyspark`. Hint: Make sure you are running with PySpark.') -def load_keras_model(filepath): - model = load_model(filepath) - return model - - -def load_keras_skeleton_model(filepath): - json_file = open(filepath, 'r') - model_json = json_file.read() - json_file.close() - loaded_model = model_from_json(model_json) - return loaded_model - - -def load_weights_to_model(model, filepath): - model.load_weights(filepath) - return model - - -# Currently can only generate a Dense model -def generate_caffe_model(kModel, filepath, weights_filepath, input_shape=None, phases=None): - n = caffe.NetSpec() - layers = kModel.layers - net_params = dict() - input_name = kModel.inputs[0].name - label_name = input_name + "_label" - - for layer in layers: - blobs = layer.get_weights() - - generate_layer(blobs, layer, n, net_params) - - # Determine the loss needed to be added - generate_loss(kModel, n, label_name) - print("Converting model to proto and converting weights") - write_caffe_model(n, filepath) - caffe_model = caffe.Net(filepath, caffe.TEST) - for layer in caffe_model.params.keys(): - for i in range(0, len(caffe_model.params[layer])): - print(layer + ": ") - print(net_params[layer][i].shape) - print(caffe_model.params[layer][i].data.shape) - # print(dir(caffe_model.params[layer])) - caffe_model.params[layer][i].data[...] = net_params[layer][i] - - caffe_model.save(weights_filepath) - - # Change back Input into Data layer for Caffe2DML - n[label_name], n[input_name] = L.Data(ntop=2) - - write_caffe_model(n, filepath) - - return n, caffe_model - - -def generate_layer(blobs, layer, n, net_params): - """ - Parameters: blobs: weights for keras, layer: keras layer, n: Caffe NetSpec, - net_params: Dictionary to store Caffe weights - """ - if type(layer) == keras.layers.InputLayer: - # Grab the batchsize from i 0, shift over channels to index 1, and place the rest into the dictionary - # TODO determine when to transform for layer types/input shape - num = len(layer.batch_input_shape) - 1 # Range from 1st index to second last - # TODO check for image_data_format to be channels_first or channels_last - batch_list = [layer.batch_input_shape[0], layer.batch_input_shape[-1]] - for i in range(1, num): - batch_list.append(layer.batch_input_shape[i]) - for i in range(len(batch_list)): # Set None dimensions to 0 for Caffe - if (batch_list[i] == None): - batch_list[i] = 1 - name = layer.name - # TODO figure out having 2 tops, with n.label - n[name] = L.Input(shape=[dict(dim=batch_list)]) - - elif type(layer) == keras.layers.Dense: - # Pull name from Keras - name = layer.name - # Pull layer name of the layer passing to current layer - in_names = get_inbound_layers(layer) - # Pipe names into caffe using unique Keras layer names - n[name] = L.InnerProduct(n[in_names[0].name], num_output=layer.units) # TODO: Assert only 1 - config = layer.get_config() - if config['use_bias']: - net_params[name] = (np.array(blobs[0]).transpose(1, 0), np.array(blobs[1])) - else: - net_params[name] = (blobs[0]) - if layer.activation is not None and layer.activation.__name__ != 'linear': - name_act = name + "_activation_" + layer.activation.__name__ # get function string - n[name_act] = get_activation(layer, n[name]) - - elif type(layer) == keras.layers.Flatten: - - """ - Caffe2DML implicitly stores all tensors as a 1D array with shapes so after every passthrough - all outputs are already flatten thus, we can ignore all flattens are just pass the - tops and bottoms across all flatten layers. - """ - - elif type(layer) == keras.layers.Dropout: # TODO Random seed will be lost - name = layer.name - in_names = get_inbound_layers(layer) - n[name] = L.Dropout(n[in_names[0].name], dropout_ratio=layer.rate, in_place=True) - - # elif type(layer) == keras.Layers.LSTM: - - elif type(layer) == keras.layers.Add: - name = layer.name - in_names = get_inbound_layers(layer) - # turn list of names into network layers - network_layers = [] - for ref in in_names: - network_layers.append(n[ref.name]) - # print(network_layers) - # unpack the bottom layers - n[name] = L.Eltwise(*network_layers, operation=1) # 1 is SUM - - elif type(layer) == keras.layers.Multiply: - name = layer.name - in_names = get_inbound_layers(layer) - # turn list of names into network layers - network_layers = [] - for ref in in_names: - network_layers.append(n[ref.name]) - # unpack the bottom layers - n[name] = L.Eltwise(*network_layers, operation=0) - - elif type(layer) == keras.layers.Concatenate: - name = layer.name - in_names = get_inbound_layers(layer) - # turn list of names into network layers - network_layers = [] - for ref in in_names: - network_layers.append(n[ref.name]) - axis = get_compensated_axis(layer) - n[name] = L.Concat(*network_layers, axis=1) - - elif type(layer) == keras.layers.Maximum: - name = layer.name - in_names = get_inbound_layers(layer) - # turn list of names into network layers - network_layers = [] - for ref in in_names: - network_layers += n[ref.name] - # unpack the bottom layers - n[name] = L.Eltwise(*network_layers, operation=2) - - elif type(layer) == keras.layers.Conv2DTranspose: - name = layer.name - in_names = get_inbound_layers(layer) - # Stride - if layer.strides is None: - stride = (1, 1) - else: - stride = layer.strides - # Padding - if layer.padding == 'same': # Calculate the padding for 'same' - padding = [layer.kernel_size[0] / 2, layer.kernel_size[1] / 2] - else: - padding = [0, 0] # If padding is valid(aka no padding) - # get bias parameter - config = layer.get_config() - use_bias = config['use_bias'] - param = dict(bias_term=use_bias) - - n[name] = L.Deconvolution(n[in_names[0].name], kernel_h=layer.kernel_size[0], - kernel_w=layer.kernel_size[1], stride_h=stride[0], - stride_w=stride[1], num_output=layer.filters, pad_h=padding[0], pad_w=padding[1], - convolution_param=param) - blobs[0] = np.array(blobs[0]).transpose(3, 2, 0, 1) - net_params[name] = blobs - if layer.activation is not None and layer.activation.__name__ != 'linear': - name_act = name + "_activation_" + layer.activation.__name__ # get function string - n[name_act] = get_activation(layer, n[name]) - - elif type(layer) == keras.layers.BatchNormalization: - name = layer.name - in_names = get_inbound_layers(layer) - n[name] = L.BatchNorm(n[in_names[0].name], moving_average_fraction=layer.momentum, eps=layer.epsilon) - variance = np.array(blobs[-1]) - mean = np.array(blobs[-2]) - - config = layer.get_config() - # Set mean variance and gamma into respective params - param = dict() - if config['scale']: - gamma = np.array(blobs[0]) - else: - gamma = np.ones(mean.shape, dtype=np.float32) - - if config['center']: - beta = np.array(blobs[1]) - param['bias_term'] = True - else: - beta = np.zeros(mean.shape, dtype=np.float32) - param['bias_term'] = False - - net_params[name] = (mean, variance, np.array(1.0)) - - name_scale = name + '_scale' - # Scale after batchNorm - n[name_scale] = L.Scale(n[name], in_place=True, scale_param=param) - net_params[name_scale] = (gamma, beta) - # TODO Needs to be implemented - elif type(layer) == keras.layers.Conv1D: - name = layer.name - in_names = get_inbound_layers(layer) - n[name] = L.Convolution(n[in_names[0]]) - - elif type(layer) == keras.layers.Conv2D: - name = layer.name - in_names = get_inbound_layers(layer) - # Stride - if layer.strides is None: - stride = (1, 1) - else: - stride = layer.strides - # Padding - if layer.padding == 'same': # Calculate the padding for 'same' - padding = [layer.kernel_size[0] / 2, layer.kernel_size[1] / 2] - else: - padding = [0, 0] # If padding is valid(aka no padding) - # TODO The rest of the arguements including bias, regulizers, dilation, - config = layer.get_config() - # get bias parameter - use_bias = config['use_bias'] - param = dict(bias_term=use_bias) - n[name] = L.Convolution(n[in_names[0].name], kernel_h=layer.kernel_size[0], - kernel_w=layer.kernel_size[1], stride_h=stride[0], - stride_w=stride[1], num_output=layer.filters, pad_h=padding[0], pad_w=padding[1], - convolution_param=param) - weights = blobs - blobs[0] = np.array(blobs[0]).transpose((3, 2, 0, 1)) - print(type(weights)) - net_params[name] = blobs - if layer.activation is not None and layer.activation.__name__ != 'linear': - name_act = name + "_activation_" + layer.activation.__name__ # get function string - n[name_act] = get_activation(layer, n[name]) - - elif type(layer) == keras.layers.MaxPooling2D or type(layer) == keras.layers.AveragePooling2D: - name = layer.name - in_names = get_inbound_layers(layer) - if type(layer) == keras.layers.MaxPooling2D: - pool = P.Pooling.MAX - else: # NOTE AveragePooling needs to be implemented - pool = P.Pooling.AVE - # Padding - # TODO The rest of the arguements including bias, regulizers, dilatin, - if layer.strides is None: - stride = (1, 1) - else: - stride = layer.strides - # Padding - if layer.padding == 'same': # Calculate the padding for 'same' - padding = [layer.pool_size[0] / 2, layer.pool_size[1] / 2] - else: - padding = [0, 0] # If padding is valid(aka no padding) - n[name] = L.Pooling(n[in_names[0].name], kernel_h=layer.pool_size[0], - kernel_w=layer.pool_size[1], stride_h=stride[0], - stride_w=stride[1], pad_h=padding[0], pad_w=padding[1], - pool=pool) - """ - if hasattr(layer,layer.activation): - name_act = name + "_activation_" + layer.activation.__name__ #get function string - n[name_act] = get_activation(layer,n[name]) - """ - # Activation (wrapper for activations) and Advanced Activation Layers - elif type(layer) == keras.layers.Activation: - name = layer.name - in_names = get_inbound_layers(layer) - n[name] = get_activation(layer, n[in_names[0].name]) # TODO: Assert only 1 - - # Caffe lacks intializer, regulizer, and constraint params - elif type(layer) == keras.layers.LeakyReLU: - # TODO: figure out how to pass Leaky params - name = layer.name - in_names = get_inbound_layers(layer) - n[name] = L.PReLU(n[in_names[0].name]) - - elif type(layer) == keras.layers.PReLU: - name = layer.name - in_names = get_inbound_layers(layer) - n[name] = L.PReLU(n[in_names[0].name]) - - elif type(layer) == keras.layers.ELU: - name = layer.name - in_names = get_inbound_layers(layer) - n[name] = L.ELU(n[in_names[0].name], layer.alpha) - - elif type(layer) == keras.layers.GlobalAveragePooling2D: - name = layer.name - in_names = get_inbound_layers(layer) - n[name] = L.Pooling(n[in_names[0].name], kernel_size=8, stride=8, pad=0, pool=P.Pooling.AVE) - - elif type(layer) == keras.layers.ZeroPadding2D: - name = layer.name - in_names = get_inbound_layers(layer) - config = layer.get_config() - padding = config['padding'] - n[name] = L.Convolution(n[in_names[0].name], num_output=3, kernel_size=1, stride=1, - pad_h=padding[0][0], pad_w=padding[1][0], convolution_param=dict(bias_term=False)) - net_params[name] = np.ones((3, 3, 1, 1)) - - else: - raise Exception("Cannot convert model. " + layer.name + " is not supported.") - - -def get_inbound_layers(layer): +# -------------------------------------------------------------------------------------- +# Design Document: +# We support Keras model by first converting it to Caffe models and then using Caffe2DML to read them +# +# Part 1: Keras network to Caffe network conversion: +# - Core logic: model.layers.flatMap(layer => _parseJSONObject(_parseKerasLayer(layer))) +# That is, for each layer, we first convert it into JSON format and then convert the JSON object into String +# - This is true for all the layers except the "specialLayers" (given in below hashmap). These are redirected to their custom parse function in _parseKerasLayer. +# - To add an activation, simply add the keras type to caffe type in supportedCaffeActivations. +# - To add a layer, add the corresponding caffe layer type in supportedLayers. If the layer accepts parameters then update layerParamMapping too. +# - The above logic is implemented in the function converKerasToCaffeNetwork +# -------------------------------------------------------------------------------------- + +supportedCaffeActivations = {'relu':'ReLU', 'softmax':'Softmax', 'sigmoid':'Sigmoid' } +supportedLayers = { + keras.layers.InputLayer: 'Data', + keras.layers.Dense: 'InnerProduct', + keras.layers.Dropout: 'Dropout', + keras.layers.Add: 'Eltwise', + keras.layers.Concatenate: 'Concat', + keras.layers.Conv2DTranspose: 'Deconvolution', + keras.layers.Conv2D: 'Convolution', + keras.layers.MaxPooling2D: 'Pooling', + keras.layers.AveragePooling2D: 'Pooling', + keras.layers.Flatten: 'None', + keras.layers.BatchNormalization: 'None', + keras.layers.Activation: 'None' + } + +def _getInboundLayers(layer): in_names = [] for node in layer.inbound_nodes: # get inbound nodes to current layer node_list = node.inbound_layers # get layers pointing to this node in_names = in_names + node_list if any('flat' in s.name for s in in_names): # For Caffe2DML to reroute any use of Flatten layers - return get_inbound_layers([s for s in in_names if 'flat' in s.name][0]) + return _getInboundLayers([s for s in in_names if 'flat' in s.name][0]) return in_names -# Only works with non Tensorflow functions! -def get_activation(layer, bottom): - if keras.activations.serialize(layer.activation) == 'relu': - return L.ReLU(bottom, in_place=True) - elif keras.activations.serialize(layer.activation) == 'softmax': - return L.Softmax(bottom) # Cannot extract axis from model, so default to -1 - elif keras.activations.serialize(layer.activation) == 'softsign': - # Needs to be implemented in caffe2dml - raise Exception("softsign is not implemented") - elif keras.activations.serialize(layer.activation) == 'elu': - return L.ELU(bottom) - elif keras.activations.serialize(layer.activation) == 'selu': - # Needs to be implemented in caffe2dml - raise Exception("SELU activation is not implemented") - elif keras.activations.serialize(layer.activation) == 'sigmoid': - return L.Sigmoid(bottom) - elif keras.activations.serialize(layer.activation) == 'tanh': - return L.TanH(bottom) - # To add more acitvaiton functions, add more elif statements with - # activation funciton __name__'s. - - -def generate_loss(kModel, n, label_name): - # Determine the loss needed to be added - for output in kModel.output_layers: - if hasattr(kModel, 'loss'): - if kModel.loss == 'categorical_crossentropy' and output.activation.__name__ == 'softmax': - name = output.name + "_activation_" + output.activation.__name__ - n[name] = L.SoftmaxWithLoss(n[output.name], n[label_name]) - elif kModel.loss == 'binary_crossentropy' and output.activation.__name__ == 'sigmoid': - name = output.name + "_activation_" + output.activation.__name__ - n[name] = L.SigmoidCrossEntropyLoss(n[output.name]) - else: # Map the rest of the loss functions to the end of the output layer in Keras - if kModel.loss == 'hinge': - name = kModel.name + 'hinge' - n[name] = L.HingeLoss(n[output.name]) - elif kModel.loss == 'categorical_crossentropy': - name = kModel.name + 'categorical_crossentropy' - n[name] = L.MultinomialLogisticLoss(n[output.name]) - # TODO Post warning to use softmax before this loss - elif kModel.loss == 'mean_squared_error': - name = kModel.name + 'mean_squared_error' - n[name] = L.EuclideanLoss(n[output.name]) - # TODO implement Infogain Loss - else: - raise Exception(kModel.loss + "is not supported") - - -# Params: keras Model, caffe prototxt filepath, filepath to save solver -def generate_caffe_solver(kModel, cModelPath, filepath): - solver_param = CaffeSolver(trainnet_prototxt_path=cModelPath, - testnet_prototxt_path=cModelPath, - debug=True) # Currently train and test are the same protos - solver_param.write(filepath) - - -# Params: NetSpec, filepath and filename -def write_caffe_model(cModel, filepath): - with open(filepath, 'w') as f: - f.write(str(cModel.to_proto())) - - -""" -Get compensated axis since Caffe has n,c,h,w and Keras has n,h,w,c for tensor dimensions -Params: Current Keras layer -""" - - -def get_compensated_axis(layer): +def _getCompensatedAxis(layer): compensated_axis = layer.axis # Cover all cases for anything accessing the 0th index or the last index if layer.axis > 0 and layer.axis < layer.input[0].shape.ndims - 1: @@ -429,91 +86,169 @@ def get_compensated_axis(layer): compensated_axis = 1 return compensated_axis -def format_optimizer_name(self,optimizer): - if optimizer == "Adadelta": - return "AdaDelta" - elif optimizer == "Adagrad": - return "AdaGrad" - elif optimizer == "Adam": - return "Adam" - elif optimizer == "RMSprop": - return "RMSProp" - elif optimizer == "SGD": - return "SGD" - else: - raise Exception(optimizer + " is not supported in Caffe2DML") - -class CaffeSolver: - """ - Caffesolver is a class for creating a solver.prototxt file. It sets default - values and can export a solver parameter file. - Note that all parameters are stored as strings. Strings variables are - stored as strings in strings. - """ - - def __init__(self, keras_model, testnet_prototxt_path="testnet.prototxt", - debug=False): - - self.sp = {} - - optimizer_name = format_optimizer_name(keras_model.optimizer.__name__) - # TODO Grab momentum values from other optimizers - # critical: - self.sp['base_lr'] = '{}'.format(keras_model.optimizer.lr) - self.sp['momentum'] = '0.9' - self.sp['type'] = '"{}"'.format(optimizer_name) - - # speed: - self.sp['test_iter'] = '100' - self.sp['test_interval'] = '250' - - # looks: - self.sp['display'] = '25' - self.sp['snapshot'] = '2500' - self.sp['snapshot_prefix'] = '"snapshot"' # string within a string! - - # learning rate policy - self.sp['lr_policy'] = '"fixed"' - - # important, but rare: - self.sp['gamma'] = '0.1' - self.sp['weight_decay'] = '0.0005' - # self.sp['train_net'] = '"' + trainnet_prototxt_path + '"' - # self.sp['test_net'] = '"' + testnet_prototxt_path + '"' - - self.sp['net'] = '"' + testnet_prototxt_path + '"' - - # pretty much never change these. - self.sp['max_iter'] = '100000' - self.sp['test_initialization'] = 'false' - self.sp['average_loss'] = '25' # this has to do with the display. - self.sp['iter_size'] = '1' # this is for accumulating gradients - - if (debug): - self.sp['max_iter'] = '12' - self.sp['test_iter'] = '1' - self.sp['test_interval'] = '4' - self.sp['display'] = '1' - - def add_from_file(self, filepath): - """ - Reads a caffe solver prototxt file and updates the Caffesolver - instance parameters. - """ - with open(filepath, 'r') as f: - for line in f: - if line[0] == '#': - continue - splitLine = line.split(':') - self.sp[splitLine[0].strip()] = splitLine[1].strip() - - def write(self, filepath): - """ - Export solver parameters to INPUT "filepath". Sorted alphabetically. - """ - f = open(filepath, 'w') - for key, value in sorted(self.sp.items()): - if not (type(value) is str): - raise Exception('All solver parameters must be strings') - f.write('%s: %s\n' % (key, value)) +str_keys = [ 'name', 'type', 'top', 'bottom' ] +def toKV(key, value): + return str(key) + ': "' + str(value) + '"' if key in str_keys else str(key) + ': ' + str(value) + + +def _parseJSONObject(obj): + rootName = obj.keys()[0] + ret = ['\n', rootName, ' {'] + for key in obj[rootName]: + if isinstance(obj[rootName][key], dict): + ret = ret + [ '\n\t', key, ' {' ] + for key1 in obj[rootName][key]: + ret = ret + [ '\n\t\t', toKV(key1, obj[rootName][key][key1]) ] + ret = ret + [ '\n\t', '}' ] + elif isinstance(obj[rootName][key], list): + for v in obj[rootName][key]: + ret = ret + ['\n\t', toKV(key, v) ] + else: + ret = ret + ['\n\t', toKV(key, obj[rootName][key]) ] + return ret + ['\n}' ] + + +def _getBottomLayers(layer): + return [ bottomLayer.name for bottomLayer in _getInboundLayers(layer) ] + + +def _parseActivation(layer, customLayerName=None): + kerasActivation = keras.activations.serialize(layer.activation) + if kerasActivation not in supportedCaffeActivations: + raise TypeError('Unsupported activation ' + kerasActivation + ' for the layer:' + layer.name) + if customLayerName is not None: + return { 'layer':{'name':customLayerName, 'type':supportedCaffeActivations[kerasActivation], 'top':layer.name, 'bottom':layer.name }} + else: + return { 'layer':{'name':layer.name, 'type':supportedCaffeActivations[kerasActivation], 'top':layer.name, 'bottom':_getBottomLayers(layer) }} + + + +def _parseKerasLayer(layer): + layerType = type(layer) + if layerType in specialLayers: + return specialLayers[layerType](layer) + elif layerType == keras.layers.Activation: + return [ _parseActivation(layer) ] + param = layerParamMapping[layerType](layer) + paramName = param.keys()[0] + if layerType == keras.layers.InputLayer: + ret = { 'layer': { 'name':layer.name, 'type':'Data', 'top':layer.name, paramName:param[paramName] } } + else: + ret = { 'layer': { 'name':layer.name, 'type':supportedLayers[layerType], 'bottom':_getBottomLayers(layer), 'top':layer.name, paramName:param[paramName] } } + return [ ret, _parseActivation(layer, layer.name + '_activation') ] if hasattr(layer, 'activation') and keras.activations.serialize(layer.activation) != 'linear' else [ ret ] + + +def _parseBatchNorm(layer): + bnName = layer.name + '_1' + config = layer.get_config() + bias_term = 'true' if config['center'] else 'false' + return [ { 'layer': { 'name':bnName, 'type':'BatchNorm', 'bottom':_getBottomLayers(layer), 'top':bnName, 'batch_norm_param':{'moving_average_fraction':layer.momentum, 'eps':layer.epsilon} } }, { 'layer': { 'name':layer.name, 'type':'Scale', 'bottom':bnName, 'top':layer.name, 'scale_param':{'bias_term':bias_term} } } ] + +# The special are redirected to their custom parse function in _parseKerasLayer +specialLayers = { + keras.layers.Flatten: lambda x: [], + keras.layers.BatchNormalization: _parseBatchNorm + } + +batchSize = 64 + +def getConvParam(layer): + stride = (1, 1) if layer.strides is None else layer.strides + padding = [layer.kernel_size[0] / 2, layer.kernel_size[1] / 2] if layer.padding == 'same' else [0, 0] + config = layer.get_config() + return {'num_output':layer.filters,'bias_term':str(config['use_bias']).lower(),'kernel_h':layer.kernel_size[0], 'kernel_w':layer.kernel_size[1], 'stride_h':stride[0],'stride_w':stride[1],'pad_h':padding[0], 'pad_w':padding[1]} + + +def getPoolingParam(layer, pool='MAX'): + stride = (1, 1) if layer.strides is None else layer.strides + padding = [layer.pool_size[0] / 2, layer.pool_size[1] / 2] if layer.padding == 'same' else [0, 0] + return {'pool':pool, 'kernel_h':layer.pool_size[0], 'kernel_w':layer.pool_size[1], 'stride_h':stride[0],'stride_w':stride[1],'pad_h':padding[0], 'pad_w':padding[1]} + +# TODO: Update AveragePooling2D when we add maxpooling support +layerParamMapping = { + keras.layers.InputLayer: lambda l: \ + {'data_param': {'batch_size': batchSize}}, + keras.layers.Dense: lambda l: \ + {'inner_product_param': {'num_output': l.units}}, + keras.layers.Dropout: lambda l: \ + {'dropout_param': {'dropout_ratio': l.rate}}, + keras.layers.Add: lambda l: \ + {'eltwise_param': {'operation': 'SUM'}}, + keras.layers.Concatenate: lambda l: \ + {'concat_param': {'axis': _getCompensatedAxis(l)}}, + keras.layers.Conv2DTranspose: lambda l: \ + {'convolution_param': getConvParam(l)}, + keras.layers.Conv2D: lambda l: \ + {'convolution_param': getConvParam(l)}, + keras.layers.MaxPooling2D: lambda l: \ + {'pooling_param': getPoolingParam(l, 'MAX')}, + keras.layers.AveragePooling2D: lambda l: \ + {'pooling_param': getPoolingParam(l, 'MAX')}, + } + +def _checkIfValid(myList, fn, errorMessage): + bool_vals = np.array([ fn(elem) for elem in myList]) + unsupported_elems = np.where(bool_vals)[0] + if len(unsupported_elems) != 0: + raise ValueError(errorMessage + str(np.array(myList)[unsupported_elems])) + +def convertKerasToCaffeNetwork(kerasModel, outCaffeNetworkFilePath): + _checkIfValid(kerasModel.layers, lambda layer: False if type(layer) in supportedLayers else True, 'Unsupported Layers:') + #unsupported_layers = np.array([False if type(layer) in supportedLayers else True for layer in kerasModel.layers]) + #if len(np.where(unsupported_layers)[0]) != 0: + # raise TypeError('Unsupported Layers:' + str(np.array(kerasModel.layers)[np.where(unsupported_layers)[0]])) + # Core logic: model.layers.flatMap(layer => _parseJSONObject(_parseKerasLayer(layer))) + jsonLayers = list(chain.from_iterable(imap(lambda layer: _parseKerasLayer(layer), kerasModel.layers))) + parsedLayers = list(chain.from_iterable(imap(lambda layer: _parseJSONObject(layer), jsonLayers))) + with open(outCaffeNetworkFilePath, 'w') as f: + f.write(''.join(parsedLayers)) + + +def getNumPyMatrixFromKerasWeight(param): + x = np.array(param) + if len(x.shape) > 2: + x = x.transpose(3, 2, 0, 1) + return x.reshape(x.shape[0], -1) + elif len(x.shape) == 1: + return np.matrix(param).transpose() + else: + return x + + +defaultSolver = """ +base_lr: 0.01 +momentum: 0.9 +weight_decay: 5e-4 +lr_policy: "exp" +gamma: 0.95 +display: 100 +solver_mode: CPU +type: "SGD" +max_iter: 2000 +test_iter: 10 +test_interval: 500 +""" +def convertKerasToCaffeSolver(kerasModel, caffeNetworkFilePath, outCaffeSolverFilePath): + with open(outCaffeSolverFilePath, 'w') as f: + f.write('net: "' + caffeNetworkFilePath + '"\n') + f.write(defaultSolver) + + +def convertKerasToSystemMLModel(spark, kerasModel, outDirectory): + _checkIfValid(kerasModel.layers, lambda layer: False if len(layer.get_weights()) <= 4 or len(layer.get_weights()) != 3 else True, 'Unsupported number of weights:') + layers = [layer for layer in kerasModel.layers if len(layer.get_weights()) > 0] + sc = spark._sc + biasToTranspose = [ keras.layers.Dense ] + dmlLines = [] + script_java = sc._jvm.org.apache.sysml.api.mlcontext.ScriptFactory.dml('') + for layer in layers: + inputMatrices = [ getNumPyMatrixFromKerasWeight(param) for param in layer.get_weights() ] + potentialVar = [ layer.name + '_weight', layer.name + '_bias', layer.name + '_1_weight', layer.name + '_1_bias' ] + for i in range(len(inputMatrices)): + dmlLines = dmlLines + [ 'write(' + potentialVar[i] + ', "' + outDirectory + '/' + potentialVar[i] + '.mtx", format="binary");\n' ] + mat = inputMatrices[i].transpose() if (i == 1 and type(layer) in biasToTranspose) else inputMatrices[i] + py4j.java_gateway.get_method(script_java, "in")(potentialVar[i], convertToMatrixBlock(sc, mat)) + script_java.setScriptString(''.join(dmlLines)) + ml = sc._jvm.org.apache.sysml.api.mlcontext.MLContext(sc._jsc) + ml.execute(script_java) http://git-wip-us.apache.org/repos/asf/systemml/blob/4b615bc0/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala b/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala index 93afbc0..d4aca53 100644 --- a/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala +++ b/src/main/scala/org/apache/sysml/api/dl/CaffeNetwork.scala @@ -164,6 +164,26 @@ class CaffeNetwork(netFilePath: String, val currentPhase: Phase, var numChannels builder.build() } else l }) + + // Condition 6: Replace last softmax layer with softmaxwithloss + val firstLayer = _caffeLayerParams.get(0) + val lastLayer = _caffeLayerParams.last + if(lastLayer.getType.toLowerCase().equalsIgnoreCase("softmax")) { + _caffeLayerParams = _caffeLayerParams.map(l => { + if(l == lastLayer) { + val builder = l.toBuilder(); + builder.setType("SoftmaxWithLoss") + builder.build() + } + else if(l == firstLayer && ( l.getType.equalsIgnoreCase("data") || l.getType.equalsIgnoreCase("input") ) && !l.getTopList.contains("label")) { + val builder = l.toBuilder(); + builder.addTop("label") + builder.build() + } + else l + }) + } + // --------------------------------------------------------------------------------
