[
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313958#comment-16313958
]
Bago Amirbekian commented on SPARK-22126:
-----------------------------------------
> Do you think it's possible to put this kind of execution in fitMultiple and
> allow CV to parallelize work for the stages?
Yes, absolutely. The iterator can maintain a queue of tasks. Each call to
`next` will pick a task off the queue, optionally add more tasks to the queue
and return a single model instance. Since models can be returned in any order,
the tasks can be organized however is needed to optimally finish the work. If
the queue is empty, `next` can simply wait for a previous task to finish and
put more tasks on the queue. The iterator approach is very flexible.
> With my PR, I would do this by having the Pipeline estimator return all
> params in getOptimizedParams.
The issue here is that when you call `fit(dataset, paramMaps)` you've now fixed
the order that you want the models returned. For my purposes I don't see much
of a difference between `Seq[(Int, Model)]` and `Iterator[(Integer, Mode)]`.
The key difference for me between `fitMutliple(..., paramMaps): Lazy[(Int,
Model)]` and `fit(..., paramMaps): Lazy[Model]` is the flexibility to produce
the models in arbitrary order.
> Fix model-specific optimization support for ML tuning
> -----------------------------------------------------
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
> Issue Type: Improvement
> Components: ML
> Affects Versions: 2.3.0
> Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in
> SPARK-19357
> more discussion is here
> https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> Anyone who's following might want to scan the design doc (in the links
> above), the latest api proposal is:
> {code}
> def fitMultiple(
> dataset: Dataset[_],
> paramMaps: Array[ParamMap]
> ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
> {code}
> Old discussion:
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
> It could be possible to still use the current parallelism and still allow
> for model-specific optimizations. For example, if we doing cross validation
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets
> say that the cross validator could know that maxIter is optimized for the
> model being evaluated (e.g. a new method in Estimator that return such
> params). It would then be straightforward for the cross validator to remove
> maxIter from the param map that will be parallelized over and use it to
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1,
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1,
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3,
> maxIter=10)) in another thread. In this example, there're 4 paramMaps, but
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap
> index for corresponding model. Use the example above, there're 4 paramMaps,
> but only return 2 callable objects, one callable object for ((regParam=0.1,
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3,
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
> paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
> override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
> }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
> fitCallables(dataset, paramMaps).map { _.call().toSeq }
> .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
> val trainingDataset = sparkSession.createDataFrame(training,
> schema).cache()
> val validationDataset = sparkSession.createDataFrame(validation,
> schema).cache()
> // Fit models in a Future for training in parallel
> val modelMapFutures = fitCallables(trainingDataset, paramMaps).map {
> callable =>
> Future[Map[Int, Model[_]]] {
> val modelMap = callable.call()
> if (collectSubModelsParam) {
> ...
> }
> modelMap
> } (executionContext)
> }
> // Unpersist training data only when all models have trained
> Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly,
> executionContext)
> .onComplete { _ => trainingDataset.unpersist() } (executionContext)
> // Evaluate models in a Future that will calulate a metric and allow
> model to be cleaned up
> val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
> modelMapFuture.map { modelMap =>
> modelMap.map { case (index: Int, model: Model[_]) =>
> val metric = eval.evaluate(model.transform(validationDataset,
> paramMaps(index)))
> (index, metric)
> }
> } (executionContext)
> }
> // Wait for metrics to be calculated before unpersisting validation
> dataset
> val foldMetrics = foldMetricMapFutures.map(ThreadUtils.awaitResult(_,
> Duration.Inf))
> .map(_.toSeq).sortBy(_._1).map(_._2)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]