http://git-wip-us.apache.org/repos/asf/systemml/blob/532da1bc/projects/breast_cancer/bin/clean_spark.sh ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/bin/clean_spark.sh b/projects/breast_cancer/bin/clean_spark.sh new file mode 100755 index 0000000..d92ce87 --- /dev/null +++ b/projects/breast_cancer/bin/clean_spark.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +rm -rf metastore_db/ +rm -rf derby.log +rm -rf spark-warehouse/ +rm -rf scratch_space/
http://git-wip-us.apache.org/repos/asf/systemml/blob/532da1bc/projects/breast_cancer/bin/monitor_gpu.sh ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/bin/monitor_gpu.sh b/projects/breast_cancer/bin/monitor_gpu.sh new file mode 100755 index 0000000..b432e3f --- /dev/null +++ b/projects/breast_cancer/bin/monitor_gpu.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +watch -n 0.5 nvidia-smi http://git-wip-us.apache.org/repos/asf/systemml/blob/532da1bc/projects/breast_cancer/bin/remove_old_processes.sh ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/bin/remove_old_processes.sh b/projects/breast_cancer/bin/remove_old_processes.sh new file mode 100755 index 0000000..2a7e903 --- /dev/null +++ b/projects/breast_cancer/bin/remove_old_processes.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +ps -ef | grep `whoami` | grep "[p]ython" | awk '{print $2}' | xargs kill -9 +ps -ef | grep `whoami` | grep "[j]ava" | awk '{print $2}' | xargs kill -9 http://git-wip-us.apache.org/repos/asf/systemml/blob/532da1bc/projects/breast_cancer/bin/run_tensorboard.sh ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/bin/run_tensorboard.sh b/projects/breast_cancer/bin/run_tensorboard.sh new file mode 100755 index 0000000..8445858 --- /dev/null +++ b/projects/breast_cancer/bin/run_tensorboard.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +tensorboard --logdir=experiments --reload_interval 5 http://git-wip-us.apache.org/repos/asf/systemml/blob/532da1bc/projects/breast_cancer/breastcancer/convnet.dml ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/breastcancer/convnet.dml b/projects/breast_cancer/breastcancer/convnet.dml new file mode 100644 index 0000000..6cbea39 --- /dev/null +++ b/projects/breast_cancer/breastcancer/convnet.dml @@ -0,0 +1,495 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +/* + * Breast Cancer LeNet-like ConvNet Model + */ +# Imports +source("nn/layers/affine.dml") as affine +source("nn/layers/conv2d_builtin.dml") as conv2d +source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss +source("nn/layers/dropout.dml") as dropout +source("nn/layers/l2_reg.dml") as l2_reg +source("nn/layers/max_pool2d_builtin.dml") as max_pool2d +source("nn/layers/relu.dml") as relu +source("nn/layers/softmax.dml") as softmax +#source("nn/optim/adam.dml") as adam +source("nn/optim/sgd_nesterov.dml") as sgd_nesterov + +train = function(matrix[double] X, matrix[double] Y, + matrix[double] X_val, matrix[double] Y_val, + int C, int Hin, int Win, + double lr, double mu, double decay, double lambda, + int batch_size, int epochs, int log_interval, + string checkpoint_dir) + return (matrix[double] Wc1, matrix[double] bc1, + matrix[double] Wc2, matrix[double] bc2, + matrix[double] Wc3, matrix[double] bc3, + matrix[double] Wa1, matrix[double] ba1, + matrix[double] Wa2, matrix[double] ba2) { + /* + * Trains a convolutional net using a "LeNet"-like architecture. + * + * The input matrix, X, has N examples, each represented as a 3D + * volume unrolled into a single vector. The targets, Y, have K + * classes, and are one-hot encoded. + * + * Inputs: + * - X: Input data matrix, of shape (N, C*Hin*Win). + * - Y: Target matrix, of shape (N, K). + * - X_val: Input validation data matrix, of shape (N, C*Hin*Win). + * - Y_val: Target validation matrix, of shape (N, K). + * - C: Number of input channels (dimensionality of input depth). + * - Hin: Input height. + * - Win: Input width. + * - lr: Learning rate. + * - mu: Momentum value. + * Typical values are in the range of [0.5, 0.99], usually + * started at the lower end and annealed towards the higher end. + * - decay: Learning rate decay rate. + * - lambda: Regularization strength. + * - batch_size: Size of mini-batches to train on. + * - epochs: Total number of full training loops over the full data set. + * - log_interval: Interval, in iterations, between log outputs. + * - checkpoint_dir: Directory to store model checkpoints. + * + * Outputs: + * - Wc1: 1st layer weights (parameters) matrix, of shape (F1, C*Hf*Wf). + * - bc1: 1st layer biases vector, of shape (F1, 1). + * - Wc2: 2nd layer weights (parameters) matrix, of shape (F2, F1*Hf*Wf). + * - bc2: 2nd layer biases vector, of shape (F2, 1). + * - Wc3: 3rd layer weights (parameters) matrix, of shape (F2*(Hin/4)*(Win/4), N3). + * - bc3: 3rd layer biases vector, of shape (1, N3). + * - Wa2: 4th layer weights (parameters) matrix, of shape (N3, K). + * - ba2: 4th layer biases vector, of shape (1, K). + */ + N = nrow(X) + K = ncol(Y) + + # Create network: + # conv1 -> relu1 -> pool1 -> conv2 -> relu2 -> pool2 -> conv3 -> relu3 -> pool3 + # -> affine1 -> relu1 -> dropout1 -> affine2 -> softmax + Hf = 3 # filter height + Wf = 3 # filter width + stride = 1 + pad = 1 # For same dimensions, (Hf - stride) / 2 + F1 = 32 # num conv filters in conv1 + F2 = 32 # num conv filters in conv2 + F3 = 32 # num conv filters in conv3 + N1 = 512 # num nodes in affine1 + # Note: affine2 has K nodes, which is equal to the number of target dimensions (num classes) + [Wc1, bc1] = conv2d::init(F1, C, Hf, Wf) # inputs: (N, C*Hin*Win) + [Wc2, bc2] = conv2d::init(F2, F1, Hf, Wf) # inputs: (N, F1*(Hin/2)*(Win/2)) + [Wc3, bc3] = conv2d::init(F3, F2, Hf, Wf) # inputs: (N, F2*(Hin/2^2)*(Win/2^2)) + [Wa1, ba1] = affine::init(F3*(Hin/2^3)*(Win/2^3), N1) # inputs: (N, F3*(Hin/2^3)*(Win/2^3)) + [Wa2, ba2] = affine::init(N1, K) # inputs: (N, N1) + Wa2 = Wa2 / sqrt(2) # different initialization, since being fed into softmax, instead of relu + + # TODO: Compare optimizers once training is faster. + # Initialize SGD w/ Nesterov momentum optimizer + vWc1 = sgd_nesterov::init(Wc1); vbc1 = sgd_nesterov::init(bc1) + vWc2 = sgd_nesterov::init(Wc2); vbc2 = sgd_nesterov::init(bc2) + vWc3 = sgd_nesterov::init(Wc3); vbc3 = sgd_nesterov::init(bc3) + vWa1 = sgd_nesterov::init(Wa1); vba1 = sgd_nesterov::init(ba1) + vWa2 = sgd_nesterov::init(Wa2); vba2 = sgd_nesterov::init(ba2) + #[mWc1, vWc1] = adam::init(Wc1) # optimizer 1st & 2nd moment state for Wc1 + #[mbc1, vbc1] = adam::init(bc1) # optimizer 1st & 2nd moment state for bc1 + #[mWc2, vWc2] = adam::init(Wc2) # optimizer 1st & 2nd moment state for Wc2 + #[mbc2, vbc2] = adam::init(bc2) # optimizer 1st & 2nd moment state for bc2 + #[mWc3, vWc3] = adam::init(Wc3) # optimizer 1st & 2nd moment state for Wc3 + #[mbc3, vbc3] = adam::init(bc3) # optimizer 1st & 2nd moment state for bc3 + #[mWa1, vWa1] = adam::init(Wa1) # optimizer 1st & 2nd moment state for Wa1 + #[mba1, vba1] = adam::init(ba1) # optimizer 1st & 2nd moment state for ba1 + #[mWa2, vWa2] = adam::init(Wa2) # optimizer 1st & 2nd moment state for Wa2 + #[mba2, vba2] = adam::init(ba2) # optimizer 1st & 2nd moment state for ba2 + #beta1 = 0.9 + #beta2 = 0.999 + #eps = 1e-8 + + # TODO: Enable starting val metrics once fast, distributed predictions are available. + # Starting validation loss & accuracy + #probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2) + #loss_val = cross_entropy_loss::forward(probs_val, Y_val) + #accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val)) + ## Output results + #print("Start: Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val) + + # Optimize + print("Starting optimization") + iters = ceil(N / batch_size) + for (e in 1:epochs) { + for(i in 1:iters) { + # Get next batch + beg = ((i-1) * batch_size) %% N + 1 + end = min(N, beg + batch_size - 1) + X_batch = X[beg:end,] + y_batch = Y[beg:end,] + + # Compute forward pass + ## conv layer 1: conv1 -> relu1 -> pool1 + [outc1, Houtc1, Woutc1] = conv2d::forward(X_batch, Wc1, bc1, C, Hin, Win, Hf, Wf, + stride, stride, pad, pad) + outc1r = relu::forward(outc1) + [outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## conv layer 2: conv2 -> relu2 -> pool2 + [outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf, + stride, stride, pad, pad) + outc2r = relu::forward(outc2) + [outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## conv layer 3: conv3 -> relu3 -> pool3 + [outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf, + stride, stride, pad, pad) + outc3r = relu::forward(outc3) + [outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## affine layer 1: affine1 -> relu1 -> dropout1 + outa1 = affine::forward(outc3p, Wa1, ba1) + outa1r = relu::forward(outa1) + [outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1) + ## affine layer 2: affine2 -> softmax + outa2 = affine::forward(outa1d, Wa2, ba2) + probs = softmax::forward(outa2) + + # Compute data backward pass + ## loss: + dprobs = cross_entropy_loss::backward(probs, y_batch) + ## affine layer 2: affine2 -> softmax + douta2 = softmax::backward(dprobs, outa2) + [douta1d, dWa2, dba2] = affine::backward(douta2, outa1d, Wa2, ba2) + ## layer 3: affine3 -> relu3 -> dropout + ## affine layer 1: affine1 -> relu1 -> dropout + douta1r = dropout::backward(douta1d, outa1r, 0.5, maskad1) + douta1 = relu::backward(douta1r, outa1) + [doutc3p, dWa1, dba1] = affine::backward(douta1, outc3p, Wa1, ba1) + ## conv layer 3: conv3 -> relu3 -> pool3 + doutc3r = max_pool2d::backward(doutc3p, Houtc3p, Woutc3p, outc3r, F3, Houtc3, Woutc3, + Hf=2, Wf=2, strideh=2, stridew=2, 0, 0) + doutc3 = relu::backward(doutc3r, outc3) + [doutc2p, dWc3, dbc3] = conv2d::backward(doutc3, Houtc3, Woutc3, outc2p, Wc3, bc2, F2, + Houtc2p, Woutc2p, Hf, Wf, stride, stride, pad, pad) + ## conv layer 2: conv2 -> relu2 -> pool2 + doutc2r = max_pool2d::backward(doutc2p, Houtc2p, Woutc2p, outc2r, F2, Houtc2, Woutc2, + Hf=2, Wf=2, strideh=2, stridew=2, 0, 0) + doutc2 = relu::backward(doutc2r, outc2) + [doutc1p, dWc2, dbc2] = conv2d::backward(doutc2, Houtc2, Woutc2, outc1p, Wc2, bc2, F1, + Houtc1p, Woutc1p, Hf, Wf, stride, stride, pad, pad) + ## conv layer 1: conv1 -> relu1 -> pool1 + doutc1r = max_pool2d::backward(doutc1p, Houtc1p, Woutc1p, outc1r, F1, Houtc1, Woutc1, + Hf=2, Wf=2, strideh=2, stridew=2, 0, 0) + doutc1 = relu::backward(doutc1r, outc1) + [dX_batch, dWc1, dbc1] = conv2d::backward(doutc1, Houtc1, Woutc1, X_batch, Wc1, bc1, C, + Hin, Win, Hf, Wf, stride, stride, pad, pad) + + # Compute regularization backward pass + dWc1_reg = l2_reg::backward(Wc1, lambda) + dWc2_reg = l2_reg::backward(Wc2, lambda) + dWc3_reg = l2_reg::backward(Wc3, lambda) + dWa1_reg = l2_reg::backward(Wa1, lambda) + dWa2_reg = l2_reg::backward(Wa2, lambda) + dWc1 = dWc1 + dWc1_reg + dWc2 = dWc2 + dWc2_reg + dWc3 = dWc3 + dWc3_reg + dWa1 = dWa1 + dWa1_reg + dWa2 = dWa2 + dWa2_reg + + # Optimize with SGD w/ Nesterov momentum + [Wc1, vWc1] = sgd_nesterov::update(Wc1, dWc1, lr, mu, vWc1) + [bc1, vbc1] = sgd_nesterov::update(bc1, dbc1, lr, mu, vbc1) + [Wc2, vWc2] = sgd_nesterov::update(Wc2, dWc2, lr, mu, vWc2) + [bc2, vbc2] = sgd_nesterov::update(bc2, dbc2, lr, mu, vbc2) + [Wc3, vWc3] = sgd_nesterov::update(Wc3, dWc3, lr, mu, vWc3) + [bc3, vbc3] = sgd_nesterov::update(bc3, dbc3, lr, mu, vbc3) + [Wa1, vWa1] = sgd_nesterov::update(Wa1, dWa1, lr, mu, vWa1) + [ba1, vba1] = sgd_nesterov::update(ba1, dba1, lr, mu, vba1) + [Wa2, vWa2] = sgd_nesterov::update(Wa2, dWa2, lr, mu, vWa2) + [ba2, vba2] = sgd_nesterov::update(ba2, dba2, lr, mu, vba2) + #t = e*i - 1 + #[Wc1, mWc1, vWc1] = adam::update(Wc1, dWc1, lr, beta1, beta2, eps, t, mWc1, vWc1) + #[bc1, mbc1, vbc1] = adam::update(bc1, dbc1, lr, beta1, beta2, eps, t, mbc1, vbc1) + #[Wc2, mWc2, vWc2] = adam::update(Wc2, dWc2, lr, beta1, beta2, eps, t, mWc2, vWc2) + #[bc2, mbc2, vbc2] = adam::update(bc2, dbc2, lr, beta1, beta2, eps, t, mbc2, vbc2) + #[Wc3, mWc3, vWc3] = adam::update(Wc3, dWc3, lr, beta1, beta2, eps, t, mWc3, vWc3) + #[bc3, mbc3, vbc3] = adam::update(bc3, dbc3, lr, beta1, beta2, eps, t, mbc3, vbc3) + #[Wa1, mWa1, vWa1] = adam::update(Wa1, dWa1, lr, beta1, beta2, eps, t, mWa1, vWa1) + #[ba1, mba1, vba1] = adam::update(ba1, dba1, lr, beta1, beta2, eps, t, mba1, vba1) + #[Wa2, mWa2, vWa2] = adam::update(Wa2, dWa2, lr, beta1, beta2, eps, t, mWa2, vWa2) + #[ba2, mba2, vba2] = adam::update(ba2, dba2, lr, beta1, beta2, eps, t, mba2, vba2) + + # Compute loss & accuracy for training & validation data every `log_interval` iterations. + if (i %% log_interval == 0) { + # Compute training loss & accuracy + loss_data = cross_entropy_loss::forward(probs, y_batch) + loss_reg_Wc1 = l2_reg::forward(Wc1, lambda) + loss_reg_Wc2 = l2_reg::forward(Wc2, lambda) + loss_reg_Wc3 = l2_reg::forward(Wc3, lambda) + loss_reg_Wa1 = l2_reg::forward(Wa1, lambda) + loss_reg_Wa2 = l2_reg::forward(Wa2, lambda) + loss = loss_data + loss_reg_Wc1 + loss_reg_Wc2 + loss_reg_Wc3 + loss_reg_Wa1 + loss_reg_Wa2 + accuracy = mean(rowIndexMax(probs) == rowIndexMax(y_batch)) + + # TODO: Consider enabling val metrics here once fast, distributed predictions are available. + ## Compute validation loss & accuracy + #probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2) + #loss_val = cross_entropy_loss::forward(probs_val, Y_val) + #accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val)) + + ## Output results + #print("Epoch: " + e + ", Iter: " + i + ", Train Loss: " + loss + ", Train Accuracy: " + # + accuracy + ", Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val + # + ", lr: " + lr + ", mu " + mu) + # Output results + print("Epoch: " + e + "/" + epochs + ", Iter: " + i + "/" + iters + + ", Train Loss: " + loss + ", Train Accuracy: " + accuracy) + } + } + + # Compute validation loss & accuracy for validation data every epoch + probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2) + loss_val = cross_entropy_loss::forward(probs_val, Y_val) + accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val)) + + # Output results + print("Epoch: " + e + "/" + epochs + ", Val Loss: " + loss_val + + ", Val Accuracy: " + accuracy_val + ", lr: " + lr + ", mu " + mu) + + # Checkpoint model + dir = checkpoint_dir + e + "/" + dummy = checkpoint(dir, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2) + str = "lr: " + lr + ", mu: " + mu + ", decay: " + decay + ", lambda: " + lambda + + ", batch_size: " + batch_size + name = dir + accuracy_val + write(str, name) + + # Anneal momentum towards 0.999 + mu = mu + (0.999 - mu)/(1+epochs-e) + # Decay learning rate + lr = lr * decay + } +} + +checkpoint = function(string dir, + matrix[double] Wc1, matrix[double] bc1, + matrix[double] Wc2, matrix[double] bc2, + matrix[double] Wc3, matrix[double] bc3, + matrix[double] Wa1, matrix[double] ba1, + matrix[double] Wa2, matrix[double] ba2) { + /* + * Save the model parameters. + * + * Inputs: + * - dir: Directory in which to save model parameters. + * - Wc1: 1st conv layer weights (parameters) matrix, of shape (F1, C*Hf*Wf). + * - bc1: 1st conv layer biases vector, of shape (F1, 1). + * - Wc2: 2nd conv layer weights (parameters) matrix, of shape (F2, F1*Hf*Wf). + * - bc2: 2nd conv layer biases vector, of shape (F2, 1). + * - Wc3: 3rd conv layer weights (parameters) matrix, of shape (F3, F2*Hf*Wf). + * - bc3: 3rd conv layer biases vector, of shape (F3, 1). + * - Wa1: 1st affine layer weights (parameters) matrix, of shape (F3*(Hin/2^3)*(Win/2^1), N1). + * - ba1: 1st affine layer biases vector, of shape (1, N1). + * - Wa2: 2nd affine layer weights (parameters) matrix, of shape (N1, K). + * - ba2: 2nd affine layer biases vector, of shape (1, K). + * + * Outputs: + * - probs: Class probabilities, of shape (N, K). + */ + write(Wc1, dir + "Wc1", format="binary") + write(bc1, dir + "bc1", format="binary") + write(Wc2, dir + "Wc2", format="binary") + write(bc2, dir + "bc2", format="binary") + write(Wc3, dir + "Wc3", format="binary") + write(bc3, dir + "bc3", format="binary") + write(Wa1, dir + "Wa1", format="binary") + write(ba1, dir + "ba1", format="binary") + write(Wa2, dir + "Wa2", format="binary") + write(ba2, dir + "ba2", format="binary") +} + +predict = function(matrix[double] X, int C, int Hin, int Win, + matrix[double] Wc1, matrix[double] bc1, + matrix[double] Wc2, matrix[double] bc2, + matrix[double] Wc3, matrix[double] bc3, + matrix[double] Wa1, matrix[double] ba1, + matrix[double] Wa2, matrix[double] ba2) + return (matrix[double] probs) { + /* + * Computes the class probability predictions of a convolutional + * net using the "LeNet" architecture. + * + * The input matrix, X, has N examples, each represented as a 3D + * volume unrolled into a single vector. + * + * Inputs: + * - X: Input data matrix, of shape (N, C*Hin*Win). + * - C: Number of input channels (dimensionality of input depth). + * - Hin: Input height. + * - Win: Input width. + * - Wc1: 1st conv layer weights (parameters) matrix, of shape (F1, C*Hf*Wf). + * - bc1: 1st conv layer biases vector, of shape (F1, 1). + * - Wc2: 2nd conv layer weights (parameters) matrix, of shape (F2, F1*Hf*Wf). + * - bc2: 2nd conv layer biases vector, of shape (F2, 1). + * - Wc3: 3rd conv layer weights (parameters) matrix, of shape (F3, F2*Hf*Wf). + * - bc3: 3rd conv layer biases vector, of shape (F3, 1). + * - Wa1: 1st affine layer weights (parameters) matrix, of shape (F3*(Hin/2^3)*(Win/2^1), N1). + * - ba1: 1st affine layer biases vector, of shape (1, N1). + * - Wa2: 2nd affine layer weights (parameters) matrix, of shape (N1, K). + * - ba2: 2nd affine layer biases vector, of shape (1, K). + * + * Outputs: + * - probs: Class probabilities, of shape (N, K). + */ + N = nrow(X) + + # Network: + # conv1 -> relu1 -> pool1 -> conv2 -> relu2 -> pool2 -> conv3 -> relu3 -> pool3 + # -> affine1 -> relu1 -> affine2 -> softmax + Hf = 3 # filter height + Wf = 3 # filter width + stride = 1 + pad = 1 # For same dimensions, (Hf - stride) / 2 + + F1 = nrow(Wc1) # num conv filters in conv1 + F2 = nrow(Wc2) # num conv filters in conv2 + F3 = nrow(Wc3) # num conv filters in conv3 + N1 = ncol(Wa1) # num nodes in affine1 + K = ncol(Wa2) # num nodes in affine2, equal to number of target dimensions (num classes) + + # TODO: Implement fast, distributed conv & max pooling operators so that predictions + # can be computed in a full-batch, distributed manner. Alternatively, improve `parfor` + # so that it can be efficiently used for parallel predictions. + ## Compute forward pass + ### conv layer 1: conv1 -> relu1 -> pool1 + #[outc1, Houtc1, Woutc1] = conv2d::forward(X, Wc1, bc1, C, Hin, Win, Hf, Wf, stride, stride, + # pad, pad) + #outc1r = relu::forward(outc1) + #[outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2, + # strideh=2, stridew=2, 0, 0) + ### conv layer 2: conv2 -> relu2 -> pool2 + #[outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf, + # stride, stride, pad, pad) + #outc2r = relu::forward(outc2) + #[outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2, + # strideh=2, stridew=2, 0, 0) + ### conv layer 3: conv3 -> relu3 -> pool3 + #[outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf, + # stride, stride, pad, pad) + #outc3r = relu::forward(outc3) + #[outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2, + # strideh=2, stridew=2, 0, 0) + ### affine layer 1: affine1 -> relu1 -> dropout + #outa1 = affine::forward(outc3p, Wa1, ba1) + #outa1r = relu::forward(outa1) + ##[outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1) + ### affine layer 2: affine2 -> softmax + #outa2 = affine::forward(outa1r, Wa2, ba2) + #probs = softmax::forward(outa2) + + # Compute predictions over mini-batches + probs = matrix(0, rows=N, cols=K) + batch_size = 50 + iters = ceil(N / batch_size) + for(i in 1:iters) { + # TODO: `parfor` should work here, possibly as an alternative to distributed predictions. + #parfor(i in 1:iters, check=0, mode=REMOTE_SPARK, resultmerge=REMOTE_SPARK) { + # Get next batch + beg = ((i-1) * batch_size) %% N + 1 + end = min(N, beg + batch_size - 1) + X_batch = X[beg:end,] + + # Compute forward pass + ## conv layer 1: conv1 -> relu1 -> pool1 + [outc1, Houtc1, Woutc1] = conv2d::forward(X_batch, Wc1, bc1, C, Hin, Win, Hf, Wf, + stride, stride, pad, pad) + outc1r = relu::forward(outc1) + [outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## conv layer 2: conv2 -> relu2 -> pool2 + [outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf, + stride, stride, pad, pad) + outc2r = relu::forward(outc2) + [outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## conv layer 3: conv3 -> relu3 -> pool3 + [outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf, + stride, stride, pad, pad) + outc3r = relu::forward(outc3) + [outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## affine layer 1: affine1 -> relu1 -> dropout + outa1 = affine::forward(outc3p, Wa1, ba1) + outa1r = relu::forward(outa1) + #[outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1) + ## affine layer 2: affine2 -> softmax + outa2 = affine::forward(outa1r, Wa2, ba2) + probs_batch = softmax::forward(outa2) + + # Store predictions + probs[beg:end,] = probs_batch + } +} + +eval = function(matrix[double] probs, matrix[double] Y) + return (double loss, double accuracy) { + /* + * Evaluates a convolutional net using the "LeNet" architecture. + * + * The probs matrix contains the class probability predictions + * of K classes over N examples. The targets, Y, have K classes, + * and are one-hot encoded. + * + * Inputs: + * - probs: Class probabilities, of shape (N, K). + * - Y: Target matrix, of shape (N, + * + * Outputs: + * - loss: Scalar loss, of shape (1). + * - accuracy: Scalar accuracy, of shape (1). + */ + # Compute loss & accuracy + loss = cross_entropy_loss::forward(probs, Y) + correct_pred = rowIndexMax(probs) == rowIndexMax(Y) + accuracy = mean(correct_pred) +} + +generate_dummy_data = function() + return (matrix[double] X, matrix[double] Y, int C, int Hin, int Win) { + /* + * Generate a dummy dataset similar to the breast cancer dataset. + * + * Outputs: + * - X: Input data matrix, of shape (N, D). + * - Y: Target matrix, of shape (N, K). + * - C: Number of input channels (dimensionality of input depth). + * - Hin: Input height. + * - Win: Input width. + */ + # Generate dummy input data + N = 1024 # num examples + C = 3 # num input channels + Hin = 256 # input height + Win = 256 # input width + K = 3 # num target classes + X = rand(rows=N, cols=C*Hin*Win, pdf="normal") + classes = round(rand(rows=N, cols=1, min=1, max=K, pdf="uniform")) + Y = table(seq(1, N), classes) # one-hot encoding +} + http://git-wip-us.apache.org/repos/asf/systemml/blob/532da1bc/projects/breast_cancer/breastcancer/convnet_distrib_sgd.dml ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/breastcancer/convnet_distrib_sgd.dml b/projects/breast_cancer/breastcancer/convnet_distrib_sgd.dml new file mode 100644 index 0000000..0c5869e --- /dev/null +++ b/projects/breast_cancer/breastcancer/convnet_distrib_sgd.dml @@ -0,0 +1,592 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +/* + * Breast Cancer LeNet-like ConvNet Model + */ +# Imports +source("nn/layers/affine.dml") as affine +source("nn/layers/conv2d_builtin.dml") as conv2d +source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss +source("nn/layers/dropout.dml") as dropout +source("nn/layers/l2_reg.dml") as l2_reg +source("nn/layers/max_pool2d_builtin.dml") as max_pool2d +source("nn/layers/relu.dml") as relu +source("nn/layers/softmax.dml") as softmax +#source("nn/optim/adam.dml") as adam +source("nn/optim/sgd_nesterov.dml") as sgd_nesterov + +train = function(matrix[double] X, matrix[double] Y, + matrix[double] X_val, matrix[double] Y_val, + int C, int Hin, int Win, + double lr, double mu, double decay, double lambda, + int batch_size, int parallel_batches, int epochs, + int log_interval, string checkpoint_dir) + return (matrix[double] Wc1, matrix[double] bc1, + matrix[double] Wc2, matrix[double] bc2, + matrix[double] Wc3, matrix[double] bc3, + matrix[double] Wa1, matrix[double] ba1, + matrix[double] Wa2, matrix[double] ba2) { + /* + * Trains a convolutional net using a "LeNet"-like architecture. + * + * The input matrix, X, has N examples, each represented as a 3D + * volume unrolled into a single vector. The targets, Y, have K + * classes, and are one-hot encoded. + * + * Inputs: + * - X: Input data matrix, of shape (N, C*Hin*Win). + * - Y: Target matrix, of shape (N, K). + * - X_val: Input validation data matrix, of shape (N, C*Hin*Win). + * - Y_val: Target validation matrix, of shape (N, K). + * - C: Number of input channels (dimensionality of input depth). + * - Hin: Input height. + * - Win: Input width. + * - lr: Learning rate. + * - mu: Momentum value. + * Typical values are in the range of [0.5, 0.99], usually + * started at the lower end and annealed towards the higher end. + * - decay: Learning rate decay rate. + * - lambda: Regularization strength. + * - batch_size: Size of mini-batches to train on. + * - parallel_batches: Number of parallel batches to run for + * distributed SGD. + * - epochs: Total number of full training loops over the full data + * set. + * - log_interval: Interval, in iterations, between log outputs. + * - checkpoint_dir: Directory to store model checkpoints. + * + * Outputs: + * - Wc1: 1st layer weights (parameters) matrix, of + * shape (F1, C*Hf*Wf). + * - bc1: 1st layer biases vector, of shape (F1, 1). + * - Wc2: 2nd layer weights (parameters) matrix, of + * shape (F2, F1*Hf*Wf). + * - bc2: 2nd layer biases vector, of shape (F2, 1). + * - Wc3: 3rd layer weights (parameters) matrix, of + * shape (F2*(Hin/4)*(Win/4), N3). + * - bc3: 3rd layer biases vector, of shape (1, N3). + * - Wa2: 4th layer weights (parameters) matrix, of shape (N3, K). + * - ba2: 4th layer biases vector, of shape (1, K). + */ + N = nrow(X) + K = ncol(Y) + + # Create network: + # conv1 -> relu1 -> pool1 -> conv2 -> relu2 -> pool2 -> conv3 -> relu3 -> pool3 + # -> affine1 -> relu1 -> dropout1 -> affine2 -> softmax + Hf = 3 # filter height + Wf = 3 # filter width + stride = 1 + pad = 1 # For same dimensions, (Hf - stride) / 2 + F1 = 32 # num conv filters in conv1 + F2 = 32 # num conv filters in conv2 + F3 = 32 # num conv filters in conv3 + N1 = 256 # num nodes in affine1 + # Note: affine2 has K nodes, which is equal to the number of target dimensions (num classes) + [Wc1, bc1] = conv2d::init(F1, C, Hf, Wf) # inputs: (N, C*Hin*Win) + [Wc2, bc2] = conv2d::init(F2, F1, Hf, Wf) # inputs: (N, F1*(Hin/2)*(Win/2)) + [Wc3, bc3] = conv2d::init(F3, F2, Hf, Wf) # inputs: (N, F2*(Hin/2^2)*(Win/2^2)) + [Wa1, ba1] = affine::init(F3*(Hin/2^3)*(Win/2^3), N1) # inputs: (N, F3*(Hin/2^3)*(Win/2^3)) + [Wa2, ba2] = affine::init(N1, K) # inputs: (N, N1) + Wa2 = Wa2 / sqrt(2) # different initialization, since being fed into softmax, instead of relu + + # TODO: Compare optimizers once training is faster. + # Initialize SGD w/ Nesterov momentum optimizer + vWc1 = sgd_nesterov::init(Wc1); vbc1 = sgd_nesterov::init(bc1) + vWc2 = sgd_nesterov::init(Wc2); vbc2 = sgd_nesterov::init(bc2) + vWc3 = sgd_nesterov::init(Wc3); vbc3 = sgd_nesterov::init(bc3) + vWa1 = sgd_nesterov::init(Wa1); vba1 = sgd_nesterov::init(ba1) + vWa2 = sgd_nesterov::init(Wa2); vba2 = sgd_nesterov::init(ba2) + #[mWc1, vWc1] = adam::init(Wc1) # optimizer 1st & 2nd moment state for Wc1 + #[mbc1, vbc1] = adam::init(bc1) # optimizer 1st & 2nd moment state for bc1 + #[mWc2, vWc2] = adam::init(Wc2) # optimizer 1st & 2nd moment state for Wc2 + #[mbc2, vbc2] = adam::init(bc2) # optimizer 1st & 2nd moment state for bc2 + #[mWc3, vWc3] = adam::init(Wc3) # optimizer 1st & 2nd moment state for Wc3 + #[mbc3, vbc3] = adam::init(bc3) # optimizer 1st & 2nd moment state for bc3 + #[mWa1, vWa1] = adam::init(Wa1) # optimizer 1st & 2nd moment state for Wa1 + #[mba1, vba1] = adam::init(ba1) # optimizer 1st & 2nd moment state for ba1 + #[mWa2, vWa2] = adam::init(Wa2) # optimizer 1st & 2nd moment state for Wa2 + #[mba2, vba2] = adam::init(ba2) # optimizer 1st & 2nd moment state for ba2 + #beta1 = 0.9 + #beta2 = 0.999 + #eps = 1e-8 + + # TODO: Enable starting val metrics once fast, distributed predictions are available. + # Starting validation loss & accuracy + #probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2) + #loss_val = cross_entropy_loss::forward(probs_val, Y_val) + #accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val)) + ## Output results + #print("Start: Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val) + + # Optimize + print("Starting optimization") + group_batch_size = parallel_batches*batch_size + groups = ceil(N/group_batch_size) + print("Total Epochs: "+epochs+", Batch size: "+batch_size+ + ", Degree of parallelism: "+parallel_batches+", Group batch size: "+group_batch_size+ + ", Num groups: "+groups) + # Loop over the dataset multiple times + for (e in 1:epochs) { + # Grab groups of mini-batches + for (g in 1:groups) { + # Get next group of mini-batches + # NOTE: At the end of the dataset, the last mini-batch in this group could be smaller than + # the other groups. + #group_beg = ((g-1) * group_batch_size) %% N + 1 + #group_end = min(N, group_beg + group_batch_size - 1) + #X_group_batch = X[group_beg:group_end,] + #Y_group_batch = Y[group_beg:group_end,] + group_beg = 1 + group_end = group_batch_size + X_group_batch = X[group_beg:group_end,] + Y_group_batch = Y[group_beg:group_end,] + + # Create data structure to store gradients computed in parallel + dWc1_agg = matrix(0, rows=parallel_batches, cols=nrow(Wc1)*ncol(Wc1)) + dWc2_agg = matrix(0, rows=parallel_batches, cols=nrow(Wc2)*ncol(Wc2)) + dWc3_agg = matrix(0, rows=parallel_batches, cols=nrow(Wc3)*ncol(Wc3)) + dWa1_agg = matrix(0, rows=parallel_batches, cols=nrow(Wa1)*ncol(Wa1)) + dWa2_agg = matrix(0, rows=parallel_batches, cols=nrow(Wa2)*ncol(Wa2)) + dbc1_agg = matrix(0, rows=parallel_batches, cols=nrow(bc1)*ncol(bc1)) + dbc2_agg = matrix(0, rows=parallel_batches, cols=nrow(bc2)*ncol(bc2)) + dbc3_agg = matrix(0, rows=parallel_batches, cols=nrow(bc3)*ncol(bc3)) + dba1_agg = matrix(0, rows=parallel_batches, cols=nrow(ba1)*ncol(ba1)) + dba2_agg = matrix(0, rows=parallel_batches, cols=nrow(ba2)*ncol(ba2)) + + # Run graph on each mini-batch in this group in parallel (ideally on multiple GPUs) + # NOTE: The parfor is causing the sizes to not be propagated into the body both here and + # in `predict`. It is not caused by the batch extraction below. It is the parfor. + parfor (j in 1:parallel_batches, mode=REMOTE_SPARK, opt=CONSTRAINED) { + #parfor (j in 1:parallel_batches) { + # Get a mini-batch in this group + beg = ((j-1) * batch_size) %% nrow(X_group_batch) + 1 + end = min(nrow(X_group_batch), beg + batch_size - 1) + X_batch = X_group_batch[beg:end,] + Y_batch = Y_group_batch[beg:end,] + #beg = 1 + #end = batch_size + #X_batch = X_group_batch[beg:end,] + #Y_batch = Y_group_batch[beg:end,] + + # Compute forward pass + ## conv layer 1: conv1 -> relu1 -> pool1 + [outc1, Houtc1, Woutc1] = conv2d::forward(X_batch, Wc1, bc1, C, Hin, Win, Hf, Wf, + stride, stride, pad, pad) + outc1r = relu::forward(outc1) + [outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## conv layer 2: conv2 -> relu2 -> pool2 + [outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf, + stride, stride, pad, pad) + outc2r = relu::forward(outc2) + [outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## conv layer 3: conv3 -> relu3 -> pool3 + [outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf, + stride, stride, pad, pad) + outc3r = relu::forward(outc3) + [outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## affine layer 1: affine1 -> relu1 -> dropout1 + outa1 = affine::forward(outc3p, Wa1, ba1) + outa1r = relu::forward(outa1) + [outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1) + ## affine layer 2: affine2 -> softmax + outa2 = affine::forward(outa1d, Wa2, ba2) + probs = softmax::forward(outa2) + + # Compute data backward pass + ## loss: + dprobs = cross_entropy_loss::backward(probs, Y_batch) + ## affine layer 2: affine2 -> softmax + douta2 = softmax::backward(dprobs, outa2) + [douta1d, dWa2, dba2] = affine::backward(douta2, outa1d, Wa2, ba2) + ## layer 3: affine3 -> relu3 -> dropout + ## affine layer 1: affine1 -> relu1 -> dropout + douta1r = dropout::backward(douta1d, outa1r, 0.5, maskad1) + douta1 = relu::backward(douta1r, outa1) + [doutc3p, dWa1, dba1] = affine::backward(douta1, outc3p, Wa1, ba1) + ## conv layer 3: conv3 -> relu3 -> pool3 + doutc3r = max_pool2d::backward(doutc3p, Houtc3p, Woutc3p, outc3r, F3, Houtc3, Woutc3, + Hf=2, Wf=2, strideh=2, stridew=2, 0, 0) + doutc3 = relu::backward(doutc3r, outc3) + [doutc2p, dWc3, dbc3] = conv2d::backward(doutc3, Houtc3, Woutc3, outc2p, Wc3, bc2, F2, + Houtc2p, Woutc2p, Hf, Wf, stride, stride, pad, pad) + ## conv layer 2: conv2 -> relu2 -> pool2 + doutc2r = max_pool2d::backward(doutc2p, Houtc2p, Woutc2p, outc2r, F2, Houtc2, Woutc2, + Hf=2, Wf=2, strideh=2, stridew=2, 0, 0) + doutc2 = relu::backward(doutc2r, outc2) + [doutc1p, dWc2, dbc2] = conv2d::backward(doutc2, Houtc2, Woutc2, outc1p, Wc2, bc2, F1, + Houtc1p, Woutc1p, Hf, Wf, stride, stride, pad, pad) + ## conv layer 1: conv1 -> relu1 -> pool1 + doutc1r = max_pool2d::backward(doutc1p, Houtc1p, Woutc1p, outc1r, F1, Houtc1, Woutc1, + Hf=2, Wf=2, strideh=2, stridew=2, 0, 0) + doutc1 = relu::backward(doutc1r, outc1) + [dX_batch, dWc1, dbc1] = conv2d::backward(doutc1, Houtc1, Woutc1, X_batch, Wc1, bc1, C, + Hin, Win, Hf, Wf, stride, stride, pad, pad) + + # Compute regularization backward pass on weights + dWc1_reg = l2_reg::backward(Wc1, lambda) + dWc2_reg = l2_reg::backward(Wc2, lambda) + dWc3_reg = l2_reg::backward(Wc3, lambda) + dWa1_reg = l2_reg::backward(Wa1, lambda) + dWa2_reg = l2_reg::backward(Wa2, lambda) + dWc1 = dWc1 + dWc1_reg + dWc2 = dWc2 + dWc2_reg + dWc3 = dWc3 + dWc3_reg + dWa1 = dWa1 + dWa1_reg + dWa2 = dWa2 + dWa2_reg + + # Flatten and store gradients for this parallel execution + # Note: We multiply by a weighting to allow for proper gradient averaging during the + # aggregation even with uneven batch sizes. + weighting = nrow(X_batch) / nrow(X_group_batch) + dWc1_agg[j,] = matrix(dWc1, rows=1, cols=nrow(Wc1)*ncol(Wc1)) * weighting + dWc2_agg[j,] = matrix(dWc2, rows=1, cols=nrow(Wc2)*ncol(Wc2)) * weighting + dWc3_agg[j,] = matrix(dWc3, rows=1, cols=nrow(Wc3)*ncol(Wc3)) * weighting + dWa1_agg[j,] = matrix(dWa1, rows=1, cols=nrow(Wa1)*ncol(Wa1)) * weighting + dWa2_agg[j,] = matrix(dWa2, rows=1, cols=nrow(Wa2)*ncol(Wa2)) * weighting + dbc1_agg[j,] = matrix(dbc1, rows=1, cols=nrow(bc1)*ncol(bc1)) * weighting + dbc2_agg[j,] = matrix(dbc2, rows=1, cols=nrow(bc2)*ncol(bc2)) * weighting + dbc3_agg[j,] = matrix(dbc3, rows=1, cols=nrow(bc3)*ncol(bc3)) * weighting + dba1_agg[j,] = matrix(dba1, rows=1, cols=nrow(ba1)*ncol(ba1)) * weighting + dba2_agg[j,] = matrix(dba2, rows=1, cols=nrow(ba2)*ncol(ba2)) * weighting + } + + # Aggregate gradients + # Note: The gradients are already pre-multiplied by a weight so that addition here + # results in gradient averaging even with different possible mini-batch sizes. I.e., + # the final mini-batch at the end of the dataset could be smaller than the other mini-batches. + dWc1 = matrix(colSums(dWc1_agg), rows=nrow(Wc1), cols=ncol(Wc1)) + dWc2 = matrix(colSums(dWc2_agg), rows=nrow(Wc2), cols=ncol(Wc2)) + dWc3 = matrix(colSums(dWc3_agg), rows=nrow(Wc3), cols=ncol(Wc3)) + dWa1 = matrix(colSums(dWa1_agg), rows=nrow(Wa1), cols=ncol(Wa1)) + dWa2 = matrix(colSums(dWa2_agg), rows=nrow(Wa2), cols=ncol(Wa2)) + dbc1 = matrix(colSums(dbc1_agg), rows=nrow(bc1), cols=ncol(bc1)) + dbc2 = matrix(colSums(dbc2_agg), rows=nrow(bc2), cols=ncol(bc2)) + dbc3 = matrix(colSums(dbc3_agg), rows=nrow(bc3), cols=ncol(bc3)) + dba1 = matrix(colSums(dba1_agg), rows=nrow(ba1), cols=ncol(ba1)) + dba2 = matrix(colSums(dba2_agg), rows=nrow(ba2), cols=ncol(ba2)) + + # Optimize with SGD w/ Nesterov momentum + [Wc1, vWc1] = sgd_nesterov::update(Wc1, dWc1, lr, mu, vWc1) + [Wc2, vWc2] = sgd_nesterov::update(Wc2, dWc2, lr, mu, vWc2) + [Wc3, vWc3] = sgd_nesterov::update(Wc3, dWc3, lr, mu, vWc3) + [Wa1, vWa1] = sgd_nesterov::update(Wa1, dWa1, lr, mu, vWa1) + [Wa2, vWa2] = sgd_nesterov::update(Wa2, dWa2, lr, mu, vWa2) + [bc1, vbc1] = sgd_nesterov::update(bc1, dbc1, lr, mu, vbc1) + [bc2, vbc2] = sgd_nesterov::update(bc2, dbc2, lr, mu, vbc2) + [bc3, vbc3] = sgd_nesterov::update(bc3, dbc3, lr, mu, vbc3) + [ba1, vba1] = sgd_nesterov::update(ba1, dba1, lr, mu, vba1) + [ba2, vba2] = sgd_nesterov::update(ba2, dba2, lr, mu, vba2) + #t = e*i - 1 + #[Wc1, mWc1, vWc1] = adam::update(Wc1, dWc1, lr, beta1, beta2, eps, t, mWc1, vWc1) + #[bc1, mbc1, vbc1] = adam::update(bc1, dbc1, lr, beta1, beta2, eps, t, mbc1, vbc1) + #[Wc2, mWc2, vWc2] = adam::update(Wc2, dWc2, lr, beta1, beta2, eps, t, mWc2, vWc2) + #[bc2, mbc2, vbc2] = adam::update(bc2, dbc2, lr, beta1, beta2, eps, t, mbc2, vbc2) + #[Wc3, mWc3, vWc3] = adam::update(Wc3, dWc3, lr, beta1, beta2, eps, t, mWc3, vWc3) + #[bc3, mbc3, vbc3] = adam::update(bc3, dbc3, lr, beta1, beta2, eps, t, mbc3, vbc3) + #[Wa1, mWa1, vWa1] = adam::update(Wa1, dWa1, lr, beta1, beta2, eps, t, mWa1, vWa1) + #[ba1, mba1, vba1] = adam::update(ba1, dba1, lr, beta1, beta2, eps, t, mba1, vba1) + #[Wa2, mWa2, vWa2] = adam::update(Wa2, dWa2, lr, beta1, beta2, eps, t, mWa2, vWa2) + #[ba2, mba2, vba2] = adam::update(ba2, dba2, lr, beta1, beta2, eps, t, mba2, vba2) + + # Compute loss & accuracy for training data every `log_interval` iterations. + if (g %% log_interval == 0) { + print("Logging training loss & accuracy for group " + g + ":") + # Get a mini-batch in this group + #j = 0 + #beg = ((j-1) * batch_size) %% nrow(X_group_batch) + 1 + #end = min(nrow(X_group_batch), beg + batch_size - 1) + #X_batch = X_group_batch[beg:end,] + #Y_batch = Y_group_batch[beg:end,] + + # Compute training loss & accuracy using final + #probs = predict(X_batch, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2) + #loss_data = cross_entropy_loss::forward(probs, Y_batch) + probs = predict(X_group_batch, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, + Wa2, ba2, batch_size) + loss_data = cross_entropy_loss::forward(probs, Y_group_batch) + loss_reg_Wc1 = l2_reg::forward(Wc1, lambda) + loss_reg_Wc2 = l2_reg::forward(Wc2, lambda) + loss_reg_Wc3 = l2_reg::forward(Wc3, lambda) + loss_reg_Wa1 = l2_reg::forward(Wa1, lambda) + loss_reg_Wa2 = l2_reg::forward(Wa2, lambda) + loss = loss_data + loss_reg_Wc1 + loss_reg_Wc2 + loss_reg_Wc3 + loss_reg_Wa1 + loss_reg_Wa2 + #accuracy = mean(rowIndexMax(probs) == rowIndexMax(Y_batch)) + accuracy = mean(rowIndexMax(probs) == rowIndexMax(Y_group_batch)) + + # TODO: Consider enabling val metrics here once fast, distributed predictions are available. + ## Compute validation loss & accuracy + #probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2) + #loss_val = cross_entropy_loss::forward(probs_val, Y_val) + #accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val)) + + # Output results + print("Epoch: " + e + ", Group: " + g + ", Train Loss: " + loss + ", Train Accuracy: " + + accuracy) # + ", Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val) + } + } + + # Compute validation loss & accuracy for validation data every epoch + print("Logging validation loss & accuracy.") + probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2, + batch_size) + loss_val = cross_entropy_loss::forward(probs_val, Y_val) + accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val)) + + # Output results + print("Epoch: " + e + "/" + epochs + ", Val Loss: " + loss_val + + ", Val Accuracy: " + accuracy_val + ", lr: " + lr + ", mu " + mu) + + # Checkpoint model + dir = checkpoint_dir + e + "/" + dummy = checkpoint(dir, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2) + str = "lr: " + lr + ", mu: " + mu + ", decay: " + decay + ", lambda: " + lambda + + ", batch_size: " + batch_size + name = dir + accuracy_val + write(str, name) + + # Anneal momentum towards 0.999 + mu = mu + (0.999 - mu)/(1+epochs-e) + # Decay learning rate + lr = lr * decay + } +} + +checkpoint = function(string dir, + matrix[double] Wc1, matrix[double] bc1, + matrix[double] Wc2, matrix[double] bc2, + matrix[double] Wc3, matrix[double] bc3, + matrix[double] Wa1, matrix[double] ba1, + matrix[double] Wa2, matrix[double] ba2) { + /* + * Save the model parameters. + * + * Inputs: + * - dir: Directory in which to save model parameters. + * - Wc1: 1st conv layer weights (parameters) matrix, of shape (F1, C*Hf*Wf). + * - bc1: 1st conv layer biases vector, of shape (F1, 1). + * - Wc2: 2nd conv layer weights (parameters) matrix, of shape (F2, F1*Hf*Wf). + * - bc2: 2nd conv layer biases vector, of shape (F2, 1). + * - Wc3: 3rd conv layer weights (parameters) matrix, of shape (F3, F2*Hf*Wf). + * - bc3: 3rd conv layer biases vector, of shape (F3, 1). + * - Wa1: 1st affine layer weights (parameters) matrix, of shape (F3*(Hin/2^3)*(Win/2^1), N1). + * - ba1: 1st affine layer biases vector, of shape (1, N1). + * - Wa2: 2nd affine layer weights (parameters) matrix, of shape (N1, K). + * - ba2: 2nd affine layer biases vector, of shape (1, K). + * + * Outputs: + * - probs: Class probabilities, of shape (N, K). + */ + write(Wc1, dir + "Wc1", format="binary") + write(bc1, dir + "bc1", format="binary") + write(Wc2, dir + "Wc2", format="binary") + write(bc2, dir + "bc2", format="binary") + write(Wc3, dir + "Wc3", format="binary") + write(bc3, dir + "bc3", format="binary") + write(Wa1, dir + "Wa1", format="binary") + write(ba1, dir + "ba1", format="binary") + write(Wa2, dir + "Wa2", format="binary") + write(ba2, dir + "ba2", format="binary") +} + +predict = function(matrix[double] X, int C, int Hin, int Win, + matrix[double] Wc1, matrix[double] bc1, + matrix[double] Wc2, matrix[double] bc2, + matrix[double] Wc3, matrix[double] bc3, + matrix[double] Wa1, matrix[double] ba1, + matrix[double] Wa2, matrix[double] ba2, + int batch_size) + return (matrix[double] probs) { + /* + * Computes the class probability predictions of a convolutional + * net using the "LeNet" architecture. + * + * The input matrix, X, has N examples, each represented as a 3D + * volume unrolled into a single vector. + * + * Inputs: + * - X: Input data matrix, of shape (N, C*Hin*Win). + * - C: Number of input channels (dimensionality of input depth). + * - Hin: Input height. + * - Win: Input width. + * - Wc1: 1st conv layer weights (parameters) matrix, of shape (F1, C*Hf*Wf). + * - bc1: 1st conv layer biases vector, of shape (F1, 1). + * - Wc2: 2nd conv layer weights (parameters) matrix, of shape (F2, F1*Hf*Wf). + * - bc2: 2nd conv layer biases vector, of shape (F2, 1). + * - Wc3: 3rd conv layer weights (parameters) matrix, of shape (F3, F2*Hf*Wf). + * - bc3: 3rd conv layer biases vector, of shape (F3, 1). + * - Wa1: 1st affine layer weights (parameters) matrix, of shape (F3*(Hin/2^3)*(Win/2^1), N1). + * - ba1: 1st affine layer biases vector, of shape (1, N1). + * - Wa2: 2nd affine layer weights (parameters) matrix, of shape (N1, K). + * - ba2: 2nd affine layer biases vector, of shape (1, K). + * - batch_size: Size of mini-batches to train on. + * + * Outputs: + * - probs: Class probabilities, of shape (N, K). + */ + N = nrow(X) + + # Network: + # conv1 -> relu1 -> pool1 -> conv2 -> relu2 -> pool2 -> conv3 -> relu3 -> pool3 + # -> affine1 -> relu1 -> affine2 -> softmax + Hf = 3 # filter height + Wf = 3 # filter width + stride = 1 + pad = 1 # For same dimensions, (Hf - stride) / 2 + + F1 = nrow(Wc1) # num conv filters in conv1 + F2 = nrow(Wc2) # num conv filters in conv2 + F3 = nrow(Wc3) # num conv filters in conv3 + N1 = ncol(Wa1) # num nodes in affine1 + K = ncol(Wa2) # num nodes in affine2, equal to number of target dimensions (num classes) + + # TODO: Implement fast, distributed conv & max pooling operators so that predictions + # can be computed in a full-batch, distributed manner. Alternatively, improve `parfor` + # so that it can be efficiently used for parallel predictions. + ## Compute forward pass + ### conv layer 1: conv1 -> relu1 -> pool1 + #[outc1, Houtc1, Woutc1] = conv2d::forward(X, Wc1, bc1, C, Hin, Win, Hf, Wf, stride, stride, + # pad, pad) + #outc1r = relu::forward(outc1) + #[outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2, + # strideh=2, stridew=2, 0, 0) + ### conv layer 2: conv2 -> relu2 -> pool2 + #[outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf, + # stride, stride, pad, pad) + #outc2r = relu::forward(outc2) + #[outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2, + # strideh=2, stridew=2, 0, 0) + ### conv layer 3: conv3 -> relu3 -> pool3 + #[outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf, + # stride, stride, pad, pad) + #outc3r = relu::forward(outc3) + #[outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2, + # strideh=2, stridew=2, 0, 0) + ### affine layer 1: affine1 -> relu1 -> dropout + #outa1 = affine::forward(outc3p, Wa1, ba1) + #outa1r = relu::forward(outa1) + ##[outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1) + ### affine layer 2: affine2 -> softmax + #outa2 = affine::forward(outa1r, Wa2, ba2) + #probs = softmax::forward(outa2) + + # Compute predictions over mini-batches + probs = matrix(0, rows=N, cols=K) + #batch_size = 32 + #iters = ceil(N / batch_size) + # TODO: `parfor` should work here, possibly as an alternative to distributed predictions. + #for (i in 1:iters) { + #parfor (i in 1:iters, check=0, mode=REMOTE_SPARK, opt=CONSTRAINED) { + #parfor (i in 1:iters, check=0) { # complains about `probs` as an inter-loop dependency + parfor (i in 1:N, check=0, mode=REMOTE_SPARK, opt=CONSTRAINED) { + #parfor (i in 1:N) { + ## Get next batch + #beg = ((i-1) * batch_size) %% N + 1 + #end = min(N, beg + batch_size - 1) + #X_batch = X[beg:end,] + #X_batch = X[i,] + + # Compute forward pass + ## conv layer 1: conv1 -> relu1 -> pool1 + [outc1, Houtc1, Woutc1] = conv2d::forward(X[i,], Wc1, bc1, C, Hin, Win, Hf, Wf, + stride, stride, pad, pad) + outc1r = relu::forward(outc1) + [outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## conv layer 2: conv2 -> relu2 -> pool2 + [outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf, + stride, stride, pad, pad) + outc2r = relu::forward(outc2) + [outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## conv layer 3: conv3 -> relu3 -> pool3 + [outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf, + stride, stride, pad, pad) + outc3r = relu::forward(outc3) + [outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2, + strideh=2, stridew=2, 0, 0) + ## affine layer 1: affine1 -> relu1 -> dropout + outa1 = affine::forward(outc3p, Wa1, ba1) + outa1r = relu::forward(outa1) + #[outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1) + ## affine layer 2: affine2 -> softmax + outa2 = affine::forward(outa1r, Wa2, ba2) + probs_batch = softmax::forward(outa2) + + # Store predictions + #probs[beg:end,] = probs_batch + probs[i,] = probs_batch + } +} + +eval = function(matrix[double] probs, matrix[double] Y) + return (double loss, double accuracy) { + /* + * Evaluates a convolutional net using the "LeNet" architecture. + * + * The probs matrix contains the class probability predictions + * of K classes over N examples. The targets, Y, have K classes, + * and are one-hot encoded. + * + * Inputs: + * - probs: Class probabilities, of shape (N, K). + * - Y: Target matrix, of shape (N, + * + * Outputs: + * - loss: Scalar loss, of shape (1). + * - accuracy: Scalar accuracy, of shape (1). + */ + # Compute loss & accuracy + loss = cross_entropy_loss::forward(probs, Y) + correct_pred = rowIndexMax(probs) == rowIndexMax(Y) + accuracy = mean(correct_pred) +} + +generate_dummy_data = function(int N) + return (matrix[double] X, matrix[double] Y, int C, int Hin, int Win) { + /* + * Generate a dummy dataset similar to the breast cancer dataset. + * + * Inputs: + * - N: Number of examples to generate. + * + * Outputs: + * - X: Input data matrix, of shape (N, D). + * - Y: Target matrix, of shape (N, K). + * - C: Number of input channels (dimensionality of input depth). + * - Hin: Input height. + * - Win: Input width. + */ + # Generate dummy input data + #N = 1024 # num examples + C = 3 # num input channels + Hin = 256 # input height + Win = 256 # input width + K = 3 # num target classes + X = rand(rows=N, cols=C*Hin*Win, pdf="normal") + classes = round(rand(rows=N, cols=1, min=1, max=K, pdf="uniform")) + Y = table(seq(1, N), classes, N, K) # one-hot encoding +} + http://git-wip-us.apache.org/repos/asf/systemml/blob/532da1bc/projects/breast_cancer/breastcancer/input_data.py ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/breastcancer/input_data.py b/projects/breast_cancer/breastcancer/input_data.py new file mode 100644 index 0000000..6e65a3e --- /dev/null +++ b/projects/breast_cancer/breastcancer/input_data.py @@ -0,0 +1,229 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +""" +Data utilities for the TUPAC16 breast cancer project. + +This is early, experimental code. + +TODO: Cleanup & add proper comments to all functions. +""" +import multiprocessing as mp +import os +import threading + +import numpy as np +import py4j + + +# Utils for reading data + +def compute_channel_means(rdd, channels, size): + """Compute the means of each color channel across the dataset.""" + # TODO: Replace this with pyspark.ml.feature.VectorSlicer + # to cut vector into separate channel vectors, then grab the mean + # of those new columns, all using DataFrame functions, rather than + # RDD functions. + # from pyspark.ml.linalg import VectorUDT + # from pyspark.sql.functions import udf + # from pyspark.sql import functions as F + # as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDT()) + # slicers[0].transform(train_df.withColumn("sample", as_ml("sample"))).select(F.avg("ch0")) + # slicers = [VectorSlicer(inputCol="sample", outputCol="ch{}".format(c), indices=range(c*pixels, c*pixels + pixels)) for c in range(CHANNELS)] + def helper(x): + x = x.sample.values + x = np.array(x) + x = (x.reshape((-1,channels,size,size)) # shape (N,C,H,W) + .transpose((0,2,3,1)) # shape (N,H,W,C) + .astype(np.float32)) + mu = np.mean(x, axis=(0,1,2)) + return mu + + means = rdd.map(helper).collect() + means = np.array(means) + means = np.mean(means, axis=0) + return means + + +def gen_class_weights(df): + """Generate class weights to even out the class distribution during training.""" + class_counts_df = df.select("tumor_score").groupBy("tumor_score").count() + class_counts = {row["tumor_score"]:row["count"] for row in class_counts_df.collect()} + max_count = max(class_counts.values()) + class_weights = {k-1:max_count/v for k,v in class_counts.items()} + return class_weights + + +def read_data(spark_session, filename_template, sample_size, channels, sample_prob, normalize_class_distribution, seed): + """Read and return training & validation Spark DataFrames.""" + # TODO: Clean this function up!!! + assert channels in (1, 3) + grayscale = False if channels == 3 else True + + # Sample (Optional) + if sample_prob < 1: + try: + # Ex: `train_0.01_sample_256.parquet` + sampled_filename_template = filename_template.format("{}_sample_".format(sample_prob), sample_size, "_grayscale" if grayscale else "") + filename = os.path.join("data", sampled_filename_template) + df = spark_session.read.load(filename) + except: # Pre-sampled DataFrame not available + filename = os.path.join("data", filename_template.format("", sample_size, "_grayscale" if grayscale else "")) + df = spark_session.read.load(filename) + p = sample_prob # sample percentage + if normalize_class_distribution: + # stratified sample with even class proportions + n = df.count() # num examples + K = 3 # num classes + s = p * n # num examples in p% sample, as a fraction + s_k = s / K # num examples per class in evenly-distributed p% sample, as fraction + class_counts_df = df.select("tumor_score").groupBy("tumor_score").count() + class_counts = {row["tumor_score"]:row["count"] for row in class_counts_df.collect()} + ps = {k:s_k/v for k,v in class_counts.items()} + df = df.sampleBy("tumor_score", fractions=ps, seed=seed) + else: + # stratified sample maintaining the original class proportions + df = df.sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=seed) + # TODO: Determine if coalesce actually provides a perf benefit on Spark 2.x + #train_df.cache(), val_df.cache() # cache here, or coalesce will hang + # tc = train_df.count() + # vc = val_df.count() + # + # # Reduce num partitions to ideal size (~128 MB/partition, determined empirically) + # current_tr_parts = train_df.rdd.getNumPartitions() + # current_val_parts = train_df.rdd.getNumPartitions() + # ex_mb = sample_size * sample_size * channels * 8 / 1024 / 1024 # size of one example in MB + # ideal_part_size_mb = 128 # 128 MB partitions sizes are empirically ideal + # ideal_exs_per_part = round(ideal_part_size_mb / ex_mb) + # tr_parts = round(tc / ideal_exs_per_part) + # val_parts = round(vc / ideal_exs_per_part) + # if current_tr_parts > tr_parts: + # train_df = train_df.coalesce(tr_parts) + # if current_val_parts > val_parts: + # val_df = val_df.coalesce(val_parts) + # train_df.cache(), val_df.cache() + else: + # Read in data + filename = os.path.join("data", filename_template.format("", sample_size, "_grayscale" if grayscale else "")) + df = spark_session.read.load(filename) + + return df + + +def read_train_data(spark_session, sample_size, channels, sample_prob=1, normalize_class_distribution=False, seed=42): + """Read training Spark DataFrame.""" + filename = "train_{}{}{}_updated.parquet" + train_df = read_data(spark_session, filename, sample_size, channels, sample_prob, normalize_class_distribution, seed) + return train_df + + +def read_val_data(spark_session, sample_size, channels, sample_prob=1, normalize_class_distribution=False, seed=42): + """Read validation Spark DataFrame.""" + filename = "val_{}{}{}_updated.parquet" + train_df = read_data(spark_session, filename, sample_size, channels, sample_prob, normalize_class_distribution, seed) + return train_df + + +# Utils for creating asynchronous queuing batch generators +# TODO: Add comments to these functions + +def fill_partition_num_queue(partition_num_queue, num_partitions, stop_event): + partition_num_queue.cancel_join_thread() + while not stop_event.is_set(): + for i in range(num_partitions): + partition_num_queue.put(i) + + +def fill_partition_queue(partition_queue, partition_num_queue, rdd, stop_event): + partition_queue.cancel_join_thread() + while not stop_event.is_set(): + # py4j has some issues with imports with first starting. + try: + partition_num = partition_num_queue.get() + partition = rdd.context.runJob(rdd, lambda x: x, [partition_num]) + partition_queue.put(partition) + except (AttributeError, py4j.protocol.Py4JError, Exception) as err: + print("error: {}".format(err)) + + +def fill_row_queue(row_queue, partition_queue, stop_event): + row_queue.cancel_join_thread() + while not stop_event.is_set(): + rows = partition_queue.get() + for row in rows: + row_queue.put(row) + + +def gen_batch(row_queue, batch_size): + while True: + features = [] + labels = [] + for i in range(batch_size): + row = row_queue.get() + features.append(row.sample.values) + labels.append(row.tumor_score) + x_batch = np.array(features).astype(np.uint8) + y_batch = np.array(labels).astype(np.uint8) + yield x_batch, y_batch + + +def create_batch_generator( + rdd, batch_size=32, num_partition_threads=32, num_row_processes=16, + partition_num_queue_size=128, partition_queue_size=16, row_queue_size=2048): + """ + Create a multiprocess batch generator. + + This creates a generator that uses processes and threads to create a + pipeline that asynchronously fetches data from Spark, filling a set + of queues, while yielding batches. The goal here is to amortize the + time needed to fetch data from Spark so that downstream consumers + are saturated. + """ + #rdd.cache() + partition_num_queue = mp.Queue(partition_num_queue_size) + partition_queue = mp.Queue(partition_queue_size) + row_queue = mp.Queue(row_queue_size) + + num_partitions = rdd.getNumPartitions() + stop_event = mp.Event() + + partition_num_process = mp.Process(target=fill_partition_num_queue, args=(partition_num_queue, num_partitions, stop_event), daemon=True) + partition_threads = [threading.Thread(target=fill_partition_queue, args=(partition_queue, partition_num_queue, rdd, stop_event), daemon=True) for _ in range(num_partition_threads)] + row_processes = [mp.Process(target=fill_row_queue, args=(row_queue, partition_queue, stop_event), daemon=True) for _ in range(num_row_processes)] + + ps = [partition_num_process] + row_processes + partition_threads + queues = [partition_num_queue, partition_queue, row_queue] + + for p in ps: + p.start() + + generator = gen_batch(row_queue, batch_size) + return generator, ps, queues, stop_event + + +def stop(processes, stop_event): + """Stop queuing processes.""" + stop_event.set() + for p in processes: + if isinstance(p, mp.Process): + p.terminate() + mp.active_children() # Use to join the killed processes above. + http://git-wip-us.apache.org/repos/asf/systemml/blob/532da1bc/projects/breast_cancer/breastcancer/softmax_clf.dml ---------------------------------------------------------------------- diff --git a/projects/breast_cancer/breastcancer/softmax_clf.dml b/projects/breast_cancer/breastcancer/softmax_clf.dml new file mode 100644 index 0000000..35fd545 --- /dev/null +++ b/projects/breast_cancer/breastcancer/softmax_clf.dml @@ -0,0 +1,207 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +/* + * Breast Cancer Softmax Model + */ +# Imports +source("nn/layers/affine.dml") as affine +source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss +source("nn/layers/softmax.dml") as softmax +#source("nn/optim/adam.dml") as adam +source("nn/optim/sgd_nesterov.dml") as sgd_nesterov + +train = function(matrix[double] X, matrix[double] Y, + matrix[double] X_val, matrix[double] Y_val, + double lr, double mu, double decay, + int batch_size, int epochs, int log_interval) + return (matrix[double] W, matrix[double] b) { + /* + * Trains a softmax classifier. + * + * The input matrix, X, has N examples, each with D features. + * The targets, Y, have K classes, and are one-hot encoded. + * + * Inputs: + * - X: Input data matrix, of shape (N, D). + * - Y: Target matrix, of shape (N, K). + * - X_val: Input validation data matrix, of shape (N, C*Hin*Win). + * - Y_val: Target validation matrix, of shape (N, K). + * - lr: Learning rate. + * - mu: Momentum value. + * Typical values are in the range of [0.5, 0.99], usually + * started at the lower end and annealed towards the higher end. + * - decay: Learning rate decay rate. + * - batch_size: Size of mini-batches to train on. + * - epochs: Total number of full training loops over the full data set. + * - log_interval: Interval, in iterations, between log outputs. + * + * Outputs: + * - W: Weights (parameters) matrix, of shape (D, K). + * - b: Biases vector, of shape (1, K). + */ + N = nrow(Y) # num examples + D = ncol(X) # num features + K = ncol(Y) # num classes + + # Create softmax classifier: + # affine -> softmax + [W, b] = affine::init(D, K) + W = W / sqrt(2.0/(D)) * sqrt(1/(D)) + + # Initialize SGD w/ Nesterov momentum optimizer + vW = sgd_nesterov::init(W) # optimizer momentum state for W + vb = sgd_nesterov::init(b) # optimizer momentum state for b + #[mW, vW] = adam::init(W) # optimizer 1st & 2nd moment state for W + #[mb, vb] = adam::init(b) # optimizer 1st & 2nd moment state for b + + # Starting validation loss & accuracy + probs_val = predict(X_val, W, b) + loss_val = cross_entropy_loss::forward(probs_val, Y_val) + accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val)) + # Output results + print("Start: Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val) + + # Optimize + print("Starting optimization") + iters = ceil(N / batch_size) + for (e in 1:epochs) { + for(i in 1:iters) { + # Get next batch + beg = ((i-1) * batch_size) %% N + 1 + end = min(N, beg + batch_size - 1) + #print("Epoch: " + e + ", Iter: " + i + ", X[" + beg + ":" + end + ",]") + X_batch = X[beg:end,] + Y_batch = Y[beg:end,] + + # Compute forward pass + ## affine & softmax: + out = affine::forward(X_batch, W, b) + probs = softmax::forward(out) + + # Compute backward pass + ## loss: + dprobs = cross_entropy_loss::backward(probs, Y_batch) + ## affine & softmax: + dout = softmax::backward(dprobs, out) + [dX_batch, dW, db] = affine::backward(dout, X_batch, W, b) + + # Optimize with SGD w/ Nesterov momentum + [W, vW] = sgd_nesterov::update(W, dW, lr, mu, vW) + [b, vb] = sgd_nesterov::update(b, db, lr, mu, vb) + #[W, mW, vW] = adam::update(W, dW, lr, 0.9, 0.999, 1e-8, e*i-1, mW, vW) + #[b, mb, vb] = adam::update(b, db, lr, 0.9, 0.999, 1e-8, e*i-1, mb, vb) + + # Compute loss & accuracy for training & validation data every `log_interval` iterations. + if (i %% log_interval == 0) { + #print("Eval time! - i: " + i) + # Compute training loss & accuracy + loss = cross_entropy_loss::forward(probs, Y_batch) + accuracy = mean(rowIndexMax(probs) == rowIndexMax(Y_batch)) + + # Compute validation loss & accuracy + probs_val = predict(X_val, W, b) + loss_val = cross_entropy_loss::forward(probs_val, Y_val) + accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val)) + + # Output results + print("Epoch: " + e + "/" + epochs + ", Iter: " + i + "/" + iters + + ", Train Loss: " + loss + ", Train Accuracy: " + accuracy + ", Val Loss: " + + loss_val + ", Val Accuracy: " + accuracy_val + ", lr: " + lr + ", mu " + mu) + } + } + # Anneal momentum towards 0.999 + mu = mu + (0.999 - mu)/(1+epochs-e) + # Decay learning rate + lr = lr * decay + } +} + +predict = function(matrix[double] X, matrix[double] W, matrix[double] b) + return (matrix[double] probs) { + /* + * Computes the class probability predictions of a softmax classifier. + * + * The input matrix, X, has N examples, each with D features. + * + * Inputs: + * - X: Input data matrix, of shape (N, D). + * - W: Weights (parameters) matrix, of shape (D, K). + * - b: Biases vector, of shape (1, K). + * + * Outputs: + * - probs: Class probabilities, of shape (N, K). + */ + N = nrow(X) # num examples + K = ncol(W) # num classes + + # Compute forward pass + ## affine & softmax: + out = affine::forward(X, W, b) + probs = softmax::forward(out) +} + +eval = function(matrix[double] probs, matrix[double] Y) + return (double loss, double accuracy) { + /* + * Evaluates a softmax classifier. + * + * The probs matrix contains the class probability predictions + * of K classes over N examples. The targets, Y, have K classes, + * and are one-hot encoded. + * + * Inputs: + * - probs: Class probabilities, of shape (N, K). + * - Y: Target matrix, of shape (N, K). + * + * Outputs: + * - loss: Scalar loss, of shape (1). + * - accuracy: Scalar accuracy, of shape (1). + */ + # Compute loss & accuracy + loss = cross_entropy_loss::forward(probs, Y) + correct_pred = rowIndexMax(probs) == rowIndexMax(Y) + accuracy = mean(correct_pred) +} + +generate_dummy_data = function() + return (matrix[double] X, matrix[double] Y, int C, int Hin, int Win) { + /* + * Generate a dummy dataset similar to the breast cancer dataset. + * + * Outputs: + * - X: Input data matrix, of shape (N, D). + * - Y: Target matrix, of shape (N, K). + * - C: Number of input channels (dimensionality of input depth). + * - Hin: Input height. + * - Win: Input width. + */ + # Generate dummy input data + N = 1024 # num examples + C = 3 # num input channels + Hin = 256 # input height + Win = 256 # input width + T = 10 # num targets + X = rand(rows=N, cols=C*Hin*Win, pdf="normal") + classes = round(rand(rows=N, cols=1, min=1, max=T, pdf="uniform")) + Y = table(seq(1, N), classes) # one-hot encoding +} +
