[ https://issues.apache.org/jira/browse/SPARK-5844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14323253#comment-14323253 ]
Peter Rudenko commented on SPARK-5844: -------------------------------------- Here's a solution i came up with. Maybe would be useful: {code:title=CustomPipeline.scala} class CustomPipeline(parralel: Boolean = false) extends Pipeline { override def fit(dataset: SchemaRDD, paramMaps: Array[ParamMap]): Seq[PipelineModel] = { if (parralel) { //Running first paramMap sequentially to cache data and other in parrallel Seq(fit(dataset, paramMaps.head)) ++ paramMaps.tail.par.map(fit(dataset, _)).toVector } else { paramMaps.map(fit(dataset, _)) } } {code} {code:title=CustomLogisticRegression.scala} class CustomLogisticRegression extends LogisticRegression { var oldInstances: RDD[LabeledPoint] = null override def fit(dataset: SchemaRDD, paramMap: ParamMap): LogisticRegressionModel = { transformSchema(dataset.schema, paramMap, logging = true) import dataset.sqlContext._ val map = this.paramMap ++ paramMap val instances = dataset.select(map(labelCol).attr, map(featuresCol).attr) .map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) } this.synchronized({ if (oldInstances == null || oldInstances.id != instances.id) { if (oldInstances != null) { oldInstances.unpersist() } oldInstances = instances instances.setName(s"Instances for LR with ParamMap $paramMap and RDD ${dataset.id}") instances.persist(StorageLevel.MEMORY_AND_DISK) } }) val lr = (new LogisticRegressionWithLBFGS) .setValidateData(false) .... {code} The idea that it's possible to run grid search for the same parameters in parallel. E.g. for LogisticRegression.regParam Array(0.1, 0.05, 0.01) for the first parameter i run fit sequentially to cache instances (RDD of LabelPoints) and for other instances it's possible to run in parallel assuming there's enough worker resources. > Optimize Pipeline.fit for ParamGrid > ----------------------------------- > > Key: SPARK-5844 > URL: https://issues.apache.org/jira/browse/SPARK-5844 > Project: Spark > Issue Type: Improvement > Components: ML > Affects Versions: 1.3.0 > Reporter: Joseph K. Bradley > > This issue was brought up by [~prudenko] in [this JIRA | > https://issues.apache.org/jira/browse/SPARK-4766]. > **Proposal**: > When Pipeline.fit is given an array of ParamMaps, it should operate > incrementally: > * For each set of parameters applicable to the first PipelineStage, > ** Fit/transform that stage using that set of parameters. > ** For each set of parameters applicable to the second PipelineStage, > *** etc. > This is essentially a depth-first search on the parameters, where each > node/level in the search tree is a PipelineStage and each node's child nodes > correspond to the set of ParamMaps for that PipelineStage. > This will avoid recomputing intermediate RDDs during model search. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org