[
https://issues.apache.org/jira/browse/SPARK-5807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Peter Rudenko updated SPARK-5807:
---------------------------------
Description:
Right now in CrossValidator for each fold combination and ParamGrid
hyperparameter pair it searches the best parameter sequentially. Assuming
there's enough workers & memory on a cluster to cache all training/validation
folds it's possible to parallelize execution. Here's a draft i came with:
{code}
val metrics = val metrics = new ArrayBuffer[Double](numModels) with
mutable.SynchronizedBuffer[Double]
val splits = MLUtils.kFold(dataset, map(numFolds), 0).zipWithIndex
def processFold(input: ((RDD[sql.Row], RDD[sql.Row]), Int)) = input match {
case ((training, validation), splitIndex) => {
val trainingDataset = sqlCtx.applySchema(training, schema).cache()
val validationDataset = sqlCtx.applySchema(validation, schema).cache()
// multi-model training
logDebug(s"Train split $splitIndex with multiple sets of parameters.")
val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
var i = 0
trainingDataset.unpersist()
while (i < numModels) {
val metric = eval.evaluate(models(i).transform(validationDataset,
epm(i)), map)
logDebug(s"Got metric $metric for model trained with ${epm(i)}.")
metrics(i) += metric
i += 1
}
validationDataset.unpersist()
}
}
if (parallel) {
splits.par.foreach(processFold)
} else {
splits.foreach(processFold)
}
{code}
Assuming there's 3 folds it would redundantly cache all the combinations
(pretty much memory), so maybe it's possible to cache each fold separately.
was:
Right now in CrossValidator for each fold combination and ParamGrid
hyperparameter pair it searches the best parameter sequentially. Assuming
there's enough workers & memory on a cluster to cache all training/validation
folds it's possible to parallelize execution. Here's a draft i came with:
{code}
import scala.collection.immutable.{ Vector => ScalaVec }
....
val metrics = ScalaVec.fill(numModels)(0.0) //Scala vector is thread safe
val splits = MLUtils.kFold(dataset, map(numFolds), 0).zipWithIndex
def processFold(input: ((RDD[sql.Row], RDD[sql.Row]), Int)) = input match {
case ((training, validation), splitIndex) => {
val trainingDataset = sqlCtx.applySchema(training, schema).cache()
val validationDataset = sqlCtx.applySchema(validation, schema).cache()
// multi-model training
logDebug(s"Train split $splitIndex with multiple sets of parameters.")
val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
var i = 0
trainingDataset.unpersist()
while (i < numModels) {
val metric = eval.evaluate(models(i).transform(validationDataset,
epm(i)), map)
logDebug(s"Got metric $metric for model trained with ${epm(i)}.")
metrics(i) += metric
i += 1
}
validationDataset.unpersist()
}
}
if (parallel) {
splits.par.foreach(processFold)
} else {
splits.foreach(processFold)
}
{code}
Assuming there's 3 folds it would redundantly cache all the combinations
(pretty much memory), so maybe it's possible to cache each fold separately.
> Parallel grid search
> ---------------------
>
> Key: SPARK-5807
> URL: https://issues.apache.org/jira/browse/SPARK-5807
> Project: Spark
> Issue Type: New Feature
> Components: ML
> Affects Versions: 1.3.0
> Reporter: Peter Rudenko
> Priority: Minor
>
> Right now in CrossValidator for each fold combination and ParamGrid
> hyperparameter pair it searches the best parameter sequentially. Assuming
> there's enough workers & memory on a cluster to cache all training/validation
> folds it's possible to parallelize execution. Here's a draft i came with:
> {code}
> val metrics = val metrics = new ArrayBuffer[Double](numModels) with
> mutable.SynchronizedBuffer[Double]
> val splits = MLUtils.kFold(dataset, map(numFolds), 0).zipWithIndex
> def processFold(input: ((RDD[sql.Row], RDD[sql.Row]), Int)) = input match {
> case ((training, validation), splitIndex) => {
> val trainingDataset = sqlCtx.applySchema(training, schema).cache()
> val validationDataset = sqlCtx.applySchema(validation, schema).cache()
> // multi-model training
> logDebug(s"Train split $splitIndex with multiple sets of parameters.")
> val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
> var i = 0
> trainingDataset.unpersist()
> while (i < numModels) {
> val metric = eval.evaluate(models(i).transform(validationDataset,
> epm(i)), map)
> logDebug(s"Got metric $metric for model trained with ${epm(i)}.")
> metrics(i) += metric
> i += 1
> }
> validationDataset.unpersist()
> }
> }
> if (parallel) {
> splits.par.foreach(processFold)
> } else {
> splits.foreach(processFold)
> }
> {code}
> Assuming there's 3 folds it would redundantly cache all the combinations
> (pretty much memory), so maybe it's possible to cache each fold separately.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]