[ 
https://issues.apache.org/jira/browse/SPARK-5807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Rudenko updated SPARK-5807:
---------------------------------
    Description: 
Right now in CrossValidator for each fold combination and ParamGrid 
hyperparameter pair it searches the best parameter sequentially. Assuming 
there's enough workers & memory on a cluster to cache all training/validation 
folds it's possible to parallelize execution. Here's a draft i came with:

{code}
val metrics = val metrics = new ArrayBuffer[Double](numModels) with 
mutable.SynchronizedBuffer[Double]
val splits = MLUtils.kFold(dataset, map(numFolds), 0).zipWithIndex

def processFold(input: ((RDD[sql.Row], RDD[sql.Row]), Int)) = input match {
  case ((training, validation), splitIndex) => {
    val trainingDataset = sqlCtx.applySchema(training, schema).cache()
    val validationDataset = sqlCtx.applySchema(validation, schema).cache()
    // multi-model training
    logDebug(s"Train split $splitIndex with multiple sets of parameters.")
    val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
    var i = 0
    trainingDataset.unpersist()
    while (i < numModels) {
      val metric = eval.evaluate(models(i).transform(validationDataset, 
epm(i)), map)
      logDebug(s"Got metric $metric for model trained with ${epm(i)}.")
      metrics(i) += metric
        i += 1
    }
    validationDataset.unpersist()
  }
}

if (parallel) {
splits.par.foreach(processFold)
} else {
splits.foreach(processFold)
}
{code}

Assuming there's 3 folds it would redundantly cache all the combinations 
(pretty much memory), so maybe it's possible to cache each fold separately.

  was:
Right now in CrossValidator for each fold combination and ParamGrid 
hyperparameter pair it searches the best parameter sequentially. Assuming 
there's enough workers & memory on a cluster to cache all training/validation 
folds it's possible to parallelize execution. Here's a draft i came with:

{code}
import scala.collection.immutable.{ Vector => ScalaVec }

....

val metrics = ScalaVec.fill(numModels)(0.0) //Scala vector is thread safe
val splits = MLUtils.kFold(dataset, map(numFolds), 0).zipWithIndex

def processFold(input: ((RDD[sql.Row], RDD[sql.Row]), Int)) = input match {
  case ((training, validation), splitIndex) => {
    val trainingDataset = sqlCtx.applySchema(training, schema).cache()
    val validationDataset = sqlCtx.applySchema(validation, schema).cache()
    // multi-model training
    logDebug(s"Train split $splitIndex with multiple sets of parameters.")
    val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
    var i = 0
    trainingDataset.unpersist()
    while (i < numModels) {
      val metric = eval.evaluate(models(i).transform(validationDataset, 
epm(i)), map)
      logDebug(s"Got metric $metric for model trained with ${epm(i)}.")
      metrics(i) += metric
        i += 1
    }
    validationDataset.unpersist()
  }
}

if (parallel) {
splits.par.foreach(processFold)
} else {
splits.foreach(processFold)
}
{code}

Assuming there's 3 folds it would redundantly cache all the combinations 
(pretty much memory), so maybe it's possible to cache each fold separately.


> Parallel grid search 
> ---------------------
>
>                 Key: SPARK-5807
>                 URL: https://issues.apache.org/jira/browse/SPARK-5807
>             Project: Spark
>          Issue Type: New Feature
>          Components: ML
>    Affects Versions: 1.3.0
>            Reporter: Peter Rudenko
>            Priority: Minor
>
> Right now in CrossValidator for each fold combination and ParamGrid 
> hyperparameter pair it searches the best parameter sequentially. Assuming 
> there's enough workers & memory on a cluster to cache all training/validation 
> folds it's possible to parallelize execution. Here's a draft i came with:
> {code}
> val metrics = val metrics = new ArrayBuffer[Double](numModels) with 
> mutable.SynchronizedBuffer[Double]
> val splits = MLUtils.kFold(dataset, map(numFolds), 0).zipWithIndex
> def processFold(input: ((RDD[sql.Row], RDD[sql.Row]), Int)) = input match {
>   case ((training, validation), splitIndex) => {
>     val trainingDataset = sqlCtx.applySchema(training, schema).cache()
>     val validationDataset = sqlCtx.applySchema(validation, schema).cache()
>     // multi-model training
>     logDebug(s"Train split $splitIndex with multiple sets of parameters.")
>     val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
>     var i = 0
>     trainingDataset.unpersist()
>     while (i < numModels) {
>       val metric = eval.evaluate(models(i).transform(validationDataset, 
> epm(i)), map)
>       logDebug(s"Got metric $metric for model trained with ${epm(i)}.")
>       metrics(i) += metric
>         i += 1
>     }
>     validationDataset.unpersist()
>   }
> }
> if (parallel) {
> splits.par.foreach(processFold)
> } else {
> splits.foreach(processFold)
> }
> {code}
> Assuming there's 3 folds it would redundantly cache all the combinations 
> (pretty much memory), so maybe it's possible to cache each fold separately.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to