This is an automated email from the ASF dual-hosted git repository.
niketanpansare pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push:
new b657820 [SYSTEMML-540] Added looped_minibatch training algorithm in
Keras2DML
b657820 is described below
commit b657820248fbb42f1c4f27564cdb14865ebeeec1
Author: Niketan Pansare <[email protected]>
AuthorDate: Mon Mar 25 12:33:50 2019 -0700
[SYSTEMML-540] Added looped_minibatch training algorithm in Keras2DML
- This algorithm performs multiple forward-backward passes
(=`parallel_batches` parameters) with the given batch size, aggregate gradients
and finally updates the model.
- Updated the documentation.
---
docs/beginners-guide-caffe2dml.md | 2 +-
docs/beginners-guide-keras2dml.md | 35 ++++++++++++-
src/main/python/systemml/mllearn/estimators.py | 11 ++--
.../scala/org/apache/sysml/api/dl/Caffe2DML.scala | 60 ++++++++++++++--------
4 files changed, 82 insertions(+), 26 deletions(-)
diff --git a/docs/beginners-guide-caffe2dml.md
b/docs/beginners-guide-caffe2dml.md
index 8814283..db74feb 100644
--- a/docs/beginners-guide-caffe2dml.md
+++ b/docs/beginners-guide-caffe2dml.md
@@ -161,7 +161,7 @@ Iter:2000, validation loss:173.66147359346, validation
accuracy:97.4897540983606
Unlike Caffe where default train and test algorithm is `minibatch`, you can
specify the
algorithm using the parameters `train_algo` and `test_algo` (valid values are:
`minibatch`, `allreduce_parallel_batches`,
-and `allreduce`). Here are some common settings:
+`looped_minibatch`, and `allreduce`). Here are some common settings:
| |
PySpark script
| Changes to
Network/Solver |
|--------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------|
diff --git a/docs/beginners-guide-keras2dml.md
b/docs/beginners-guide-keras2dml.md
index 4517be5..2259397 100644
--- a/docs/beginners-guide-keras2dml.md
+++ b/docs/beginners-guide-keras2dml.md
@@ -208,4 +208,37 @@ For example: for the expression `Keras2DML(...,
display=100, test_iter=10, test_
To verify that Keras2DML produce same results as other Keras' backend, we have
[Python unit
tests](https://github.com/apache/systemml/blob/master/src/main/python/tests/test_nn_numpy.py)
that compare the results of Keras2DML with that of TensorFlow. We assume that
Keras team ensure that all their backends are consistent with their TensorFlow
backend.
-
+#### How can I train very deep models on GPU?
+
+Unlike Keras where default train and test algorithm is `minibatch`, you can
specify the
+algorithm using the parameters `train_algo` and `test_algo` (valid values are:
`minibatch`, `allreduce_parallel_batches`,
+`looped_minibatch`, and `allreduce`). Here are some common settings:
+
+| |
PySpark script
| Changes to
Network/Solver |
+|--------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------|
+| Single-node CPU execution (similar to Caffe with solver_mode: CPU) |
`lenet.set(train_algo="minibatch", test_algo="minibatch")`
| Ensure that
`batch_size` is set to appropriate value (for example: 64) |
+| Single-node single-GPU execution |
`lenet.set(train_algo="minibatch",
test_algo="minibatch").setGPU(True).setForceGPU(True)`
| Ensure that `batch_size` is set to appropriate value
(for example: 64) |
+| Single-node multi-GPU execution (similar to Caffe with solver_mode: GPU) |
`lenet.set(train_algo="allreduce_parallel_batches", test_algo="minibatch",
parallel_batches=num_gpu).setGPU(True).setForceGPU(True)` | Ensure that
`batch_size` is set to appropriate value (for example: 64) |
+| Distributed prediction |
`lenet.set(test_algo="allreduce")`
|
|
+| Distributed synchronous training |
`lenet.set(train_algo="allreduce_parallel_batches",
parallel_batches=num_cluster_cores)`
| Ensure that `batch_size` is set to appropriate value (for example: 64) |
+
+Here are high-level guidelines to train very deep models on GPU with Keras2DML
(and Caffe2DML):
+
+1. If there exists at least one layer/operator that does not fit on the
device, please allow SystemML's optimizer to perform operator placement based
on the memory estimates `sysml_model.setGPU(True)`.
+2. If each individual layer/operator fits on the device but not the entire
network with a batch size of 1, then
+- Rely on SystemML's GPU Memory Manager to perform automatic eviction
(recommended): `sysml_model.setGPU(True) # Optional: .setForceGPU(True)`
+- Or enable Nvidia's Unified Memory:
`sysml_model.setConfigProperty('sysml.gpu.memory.allocator', 'unified_memory')`
+3. If the entire neural network does not fit in the GPU memory with the
user-specified `batch_size`, but fits in the GPU memory with `local_batch_size`
such that `1 << local_batch_size < batch_size`, then
+- Use either of the above two options.
+- Or enable `train_algo` that performs multiple forward-backward pass with
batch size `local_batch_size`, aggregate gradients and finally updates the
model:
+```python
+sysml_model = Keras2DML(spark, keras_model, batch_size=local_batch_size)
+sysml_model.set(train_algo="looped_minibatch",
parallel_batches=int(batch_size/local_batch_size))
+sysml_model.setGPU(True).setForceGPU(True)
+```
+- Or add `int(batch_size/local_batch_size)` GPUs and perform single-node
multi-GPU training with batch size `local_batch_size`:
+```python
+sysml_model = Keras2DML(spark, keras_model, batch_size=local_batch_size)
+sysml_model.set(train_algo="allreduce_parallel_batches",
parallel_batches=int(batch_size/local_batch_size))
+sysml_model.setGPU(True).setForceGPU(True)
+```
diff --git a/src/main/python/systemml/mllearn/estimators.py
b/src/main/python/systemml/mllearn/estimators.py
index 456280b..0b47d8c 100644
--- a/src/main/python/systemml/mllearn/estimators.py
+++ b/src/main/python/systemml/mllearn/estimators.py
@@ -923,22 +923,23 @@ class Caffe2DML(BaseSystemMLClassifier):
def set(self, debug=None, train_algo=None, test_algo=None,
parallel_batches=None,
output_activations=None, perform_one_hot_encoding=None,
parfor_parameters=None, inline_nn_library=None, use_builtin_lstm_fn=None,
- perform_fused_backward_update=None):
+ perform_fused_backward_update=None, weight_parallel_batches=None):
"""
Set input to Caffe2DML
Parameters
----------
debug: to add debugging DML code such as classification report, print
DML script, etc (default: False)
- train_algo: can be minibatch, batch, allreduce_parallel_batches or
allreduce (default: minibatch)
- test_algo: can be minibatch, batch, allreduce_parallel_batches or
allreduce (default: minibatch)
- parallel_batches: number of parallel batches
+ train_algo: can be minibatch, batch, allreduce_parallel_batches,
looped_minibatch or allreduce (default: minibatch)
+ test_algo: can be minibatch, batch, allreduce_parallel_batches,
looped_minibatch or allreduce (default: minibatch)
+ parallel_batches: number of parallel batches (required for
allreduce_parallel_batches or looped_minibatch)
output_activations: (developer flag) directory to output activations
of each layer as csv while prediction. To be used only in batch mode (default:
None)
perform_one_hot_encoding: should perform one-hot encoding in DML using
table function (default: True)
parfor_parameters: dictionary for parfor parameters when using
allreduce-style algorithms (default: "")
inline_nn_library: whether to inline the NN library when generating
DML using Caffe2DML (default: False)
use_builtin_lstm_fn: whether to use builtin lstm function for LSTM
layer (default: True)
perform_fused_backward_update: whether to perform update immediately
after backward pass at the script level. Supported for minibatch and batch
algorithms. (default: True)
+ weight_parallel_batches: whether to multiply 1/parallel_batches to
gradients before performing SGD update (default: True)
"""
if debug is not None:
self.estimator.setInput("$debug", str(debug).upper())
@@ -954,6 +955,8 @@ class Caffe2DML(BaseSystemMLClassifier):
self.estimator.setInput("$use_builtin_lstm_fn",
str(use_builtin_lstm_fn).upper())
if perform_fused_backward_update is not None:
self.estimator.setInput("$perform_fused_backward_update",
str(perform_fused_backward_update).upper())
+ if weight_parallel_batches is not None:
+ self.estimator.setInput("$weight_parallel_batches",
str(weight_parallel_batches).upper())
if output_activations is not None:
self.estimator.setInput(
"$output_activations",
diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
index c5a20db..9950d69 100644
--- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
+++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
@@ -118,7 +118,7 @@ To shield from network files that violates this
restriction, Caffe2DML performs
object Caffe2DML {
val LOG = LogFactory.getLog(classOf[Caffe2DML].getName())
// ------------------------------------------------------------------------
- val USE_PLUS_EQ = true
+ var USE_PLUS_EQ = true
def nnDir = "nn/"
def layerDir = nnDir + "layers/"
def optimDir = nnDir + "optim/"
@@ -157,6 +157,7 @@ object Caffe2DML {
val rand = new Random
// Supported Algorithms:
val MINIBATCH_ALGORITHM = "minibatch"
+ val LOOPED_MINIBATCH_ALGORITHM = "looped_minibatch"
val BATCH_ALGORITHM = "batch"
val ALLREDUCE_ALGORITHM = "allreduce"
val ALLREDUCE_PARALLEL_BATCHES_ALGORITHM = "allreduce_parallel_batches"
@@ -321,6 +322,7 @@ class Caffe2DML(val sc: SparkContext,
case "$inline_nn_library" => false
case "$use_builtin_lstm_fn" => true
case "$perform_fused_backward_update" => true
+ case "$weight_parallel_batches" => true
case _ => throw new DMLRuntimeException("Unsupported input:" + key)
}
}
@@ -329,7 +331,7 @@ class Caffe2DML(val sc: SparkContext,
// The below method parses the provided network and solver file and
generates DML script.
def getTrainingScript(isSingleNode: Boolean): (Script, String, String) = {
val startTrainingTime = System.nanoTime()
-
+
reset // Reset the state of DML generator for training script.
// Flags passed by user
@@ -357,7 +359,9 @@ class Caffe2DML(val sc: SparkContext,
tabDMLScript.append(print(dmlConcat(asDMLString("Iterations (for
training loss/accuracy) refers to the number of batches processed where batch
size="), Caffe2DML.batchSize)))
}
if(getTrainAlgo.toLowerCase.equals(Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM)
||
-
getTestAlgo.toLowerCase.equals(Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM))
{
+
getTestAlgo.toLowerCase.equals(Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM)
||
+ getTrainAlgo.toLowerCase.equals(Caffe2DML.LOOPED_MINIBATCH_ALGORITHM)
||
+ getTestAlgo.toLowerCase.equals(Caffe2DML.LOOPED_MINIBATCH_ALGORITHM)) {
assign(tabDMLScript, "parallel_batches", "$parallel_batches")
}
//
----------------------------------------------------------------------------
@@ -426,7 +430,7 @@ class Caffe2DML(val sc: SparkContext,
lrPolicy.updateLearningRate(tabDMLScript)
}
}
- case Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM => {
+ case Caffe2DML.LOOPED_MINIBATCH_ALGORITHM |
Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM => {
assign(tabDMLScript, "e", "0")
assign(tabDMLScript, "max_iter", ifdef("$max_iter",
solverParam.getMaxIter.toString))
forBlock("iter", "1", "max_iter", "parallel_batches") {
@@ -436,7 +440,16 @@ class Caffe2DML(val sc: SparkContext,
assign(tabDMLScript, "allreduce_start_index", "1")
}
initializeGradients("parallel_batches")
- parForBlock("j", "1", "parallel_batches", "1",
getParforParameters()) {
+ val old_USE_PLUS_EQ = Caffe2DML.USE_PLUS_EQ
+ val iterBlock =
if(getTrainAlgo.toLowerCase.equals(Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM))
{
+ parForBlock("j", "1", "parallel_batches", "1",
getParforParameters()) _
+ }
+ else {
+ Caffe2DML.USE_PLUS_EQ = true
+ forBlock("j", "1", "parallel_batches", "1") _
+ }
+
+ iterBlock {
// Get a mini-batch in this group
assign(tabDMLScript, "beg", "allreduce_start_index + (j-1)*" +
Caffe2DML.batchSize)
assign(tabDMLScript, "end", "allreduce_start_index + j*" +
Caffe2DML.batchSize + " - 1")
@@ -463,6 +476,7 @@ class Caffe2DML(val sc: SparkContext,
}
}
performSnapshot
+ Caffe2DML.USE_PLUS_EQ = old_USE_PLUS_EQ
}
}
case Caffe2DML.ALLREDUCE_ALGORITHM => {
@@ -570,7 +584,7 @@ class Caffe2DML(val sc: SparkContext,
tabDMLScript.append("# Compute validation loss & accuracy\n")
assign(tabDMLScript, "loss", "0"); assign(tabDMLScript, "accuracy", "0")
getTestAlgo.toLowerCase match {
- case Caffe2DML.MINIBATCH_ALGORITHM => {
+ case Caffe2DML.MINIBATCH_ALGORITHM |
Caffe2DML.LOOPED_MINIBATCH_ALGORITHM => {
assign(tabDMLScript, "validation_loss", "0")
assign(tabDMLScript, "validation_accuracy", "0")
forBlock("iVal", "1", "num_batches_per_epoch") {
@@ -695,29 +709,35 @@ class Caffe2DML(val sc: SparkContext,
}
}
private def flattenGradients(): Unit = {
- if(Caffe2DML.USE_PLUS_EQ) {
- // Note: We multiply by a weighting to allow for proper gradient
averaging during the
- // aggregation even with uneven batch sizes.
+ if(!Caffe2DML.USE_PLUS_EQ) {
+ tabDMLScript.append("# Flatten and store gradients for this parallel
execution\n")
+ }
+ val isLoopedMinibatch =
getTrainAlgo.toLowerCase.equals(Caffe2DML.LOOPED_MINIBATCH_ALGORITHM)
+ val suffixDML = if(getInputBooleanValue("$weight_parallel_batches")) " *
weighting" else ""
+ // Note: We multiply by a weighting to allow for proper gradient averaging
during the
+ // aggregation even with uneven batch sizes.
+ if(getInputBooleanValue("$weight_parallel_batches")) {
assign(tabDMLScript, "weighting", "1/parallel_batches") //
"nrow(Xb)/X_group_batch_size")
+ }
+ if(Caffe2DML.USE_PLUS_EQ) {
net.getLayers
.map(layer => net.getCaffeLayer(layer))
.map(l => {
- if (l.shouldUpdateWeight) assignPlusEq(tabDMLScript, l.dWeight +
"_agg", l.dWeight + "*weighting")
- if (l.shouldUpdateExtraWeight) assignPlusEq(tabDMLScript,
l.dExtraWeight + "_agg", l.dExtraWeight + "*weighting")
- if (l.shouldUpdateWeight) assignPlusEq(tabDMLScript, l.dBias +
"_agg", l.dBias + "*weighting")
+ if (l.shouldUpdateWeight) assignPlusEq(tabDMLScript, l.dWeight +
"_agg", l.dWeight + suffixDML)
+ if (l.shouldUpdateExtraWeight) assignPlusEq(tabDMLScript,
l.dExtraWeight + "_agg", l.dExtraWeight + suffixDML)
+ if (l.shouldUpdateWeight) assignPlusEq(tabDMLScript, l.dBias +
"_agg", l.dBias + suffixDML)
})
}
else {
- tabDMLScript.append("# Flatten and store gradients for this parallel
execution\n")
- // Note: We multiply by a weighting to allow for proper gradient
averaging during the
- // aggregation even with uneven batch sizes.
- assign(tabDMLScript, "weighting", "1/parallel_batches") //
"nrow(Xb)/X_group_batch_size")
+ if(isLoopedMinibatch) {
+ throw new DMLRuntimeException("Flattening and storing gradients is not
supported for looped_minibatch algorithm")
+ }
net.getLayers
.map(layer => net.getCaffeLayer(layer))
.map(l => {
- if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight +
"_agg[j,]", matrix(l.dWeight, "1", multiply(nrow(l.weight), ncol(l.weight))) +
" * weighting")
- if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight +
"_agg[j,]", matrix(l.dExtraWeight, "1", multiply(nrow(l.extraWeight),
ncol(l.extraWeight))) + " * weighting")
- if (l.shouldUpdateWeight) assign(tabDMLScript, l.dBias + "_agg[j,]",
matrix(l.dBias, "1", multiply(nrow(l.bias), ncol(l.bias))) + " * weighting")
+ if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight +
"_agg[j,]", matrix(l.dWeight, "1", multiply(nrow(l.weight), ncol(l.weight))) +
suffixDML)
+ if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight +
"_agg[j,]", matrix(l.dExtraWeight, "1", multiply(nrow(l.extraWeight),
ncol(l.extraWeight))) + suffixDML)
+ if (l.shouldUpdateWeight) assign(tabDMLScript, l.dBias + "_agg[j,]",
matrix(l.dBias, "1", multiply(nrow(l.bias), ncol(l.bias))) + suffixDML)
})
}
}
@@ -807,7 +827,7 @@ class Caffe2DMLModel(val numClasses: String, val sc:
SparkContext, val solver: C
val lastLayerShape = estimator.getOutputShapeOfLastLayer
assign(tabDMLScript, "Prob", matrix("1", Caffe2DML.numImages,
(lastLayerShape._1 * lastLayerShape._2 * lastLayerShape._3).toString))
estimator.getTestAlgo.toLowerCase match {
- case Caffe2DML.MINIBATCH_ALGORITHM => {
+ case Caffe2DML.MINIBATCH_ALGORITHM |
Caffe2DML.LOOPED_MINIBATCH_ALGORITHM => {
ceilDivide(tabDMLScript(), "num_iters", Caffe2DML.numImages,
Caffe2DML.batchSize)
forBlock("iter", "1", "num_iters") {
getTestBatch(tabDMLScript)