[ 
https://issues.apache.org/jira/browse/SPARK-5844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14323253#comment-14323253
 ] 

Peter Rudenko commented on SPARK-5844:
--------------------------------------

Here's a solution i came up with. Maybe would be useful:

{code:title=CustomPipeline.scala}
class CustomPipeline(parralel: Boolean = false) extends Pipeline {

  override def fit(dataset: SchemaRDD, paramMaps: Array[ParamMap]): 
Seq[PipelineModel] = {
    if (parralel) {
      //Running first paramMap sequentially to cache data and other in parrallel
      Seq(fit(dataset, paramMaps.head)) ++ paramMaps.tail.par.map(fit(dataset, 
_)).toVector
    } else {
      paramMaps.map(fit(dataset, _))
    }

  }
{code}

{code:title=CustomLogisticRegression.scala}
class CustomLogisticRegression extends LogisticRegression {
  var oldInstances: RDD[LabeledPoint] = null
  
  override def fit(dataset: SchemaRDD, paramMap: ParamMap): 
LogisticRegressionModel = {
    transformSchema(dataset.schema, paramMap, logging = true)
    import dataset.sqlContext._
    val map = this.paramMap ++ paramMap
    val instances = dataset.select(map(labelCol).attr, map(featuresCol).attr)
      .map {
        case Row(label: Double, features: Vector) =>
          LabeledPoint(label, features)
      }

    this.synchronized({
      if (oldInstances == null || oldInstances.id != instances.id) {
        if (oldInstances != null) {
          oldInstances.unpersist()
        }
        oldInstances = instances
        instances.setName(s"Instances for LR with ParamMap $paramMap and RDD 
${dataset.id}")
        instances.persist(StorageLevel.MEMORY_AND_DISK)
      }
    })

    val lr = (new LogisticRegressionWithLBFGS)
      .setValidateData(false)
   ....
{code}

The idea that it's possible to run grid search for the same parameters in 
parallel. E.g. for LogisticRegression.regParam  Array(0.1, 0.05, 0.01) for the 
first parameter i run fit sequentially to cache instances (RDD of LabelPoints) 
and for other instances it's possible to run in parallel assuming there's 
enough worker resources. 



> Optimize Pipeline.fit for ParamGrid
> -----------------------------------
>
>                 Key: SPARK-5844
>                 URL: https://issues.apache.org/jira/browse/SPARK-5844
>             Project: Spark
>          Issue Type: Improvement
>          Components: ML
>    Affects Versions: 1.3.0
>            Reporter: Joseph K. Bradley
>
> This issue was brought up by [~prudenko] in [this JIRA | 
> https://issues.apache.org/jira/browse/SPARK-4766].
> **Proposal**:
> When Pipeline.fit is given an array of ParamMaps, it should operate 
> incrementally:
> * For each set of parameters applicable to the first PipelineStage,
> ** Fit/transform that stage using that set of parameters.
> ** For each set of parameters applicable to the second PipelineStage,
> *** etc.
> This is essentially a depth-first search on the parameters, where each 
> node/level in the search tree is a PipelineStage and each node's child nodes 
> correspond to the set of ParamMaps for that PipelineStage.
> This will avoid recomputing intermediate RDDs during model search.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to