[
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16282699#comment-16282699
]
Joseph K. Bradley commented on SPARK-22126:
-------------------------------------------
Continuing the discussion from the gist: Perhaps we're overthinking things when
worrying about handling multiple interdependent Callables. I figure there are
2 main use cases for optimizing model fitting:
* Parallel Spark jobs for fitting multiple models at once
** This is what has been introduced into CrossValidator, TrainValidationSplit
and OneVsRest for Spark 2.3 already.
** This use case is primarily for _small_ Spark jobs. E.g., fitting a bunch of
small models requires a bunch of small jobs. Each job needs to be
lightweight/fast in order to get much benefit from running parallel jobs.
* Single Spark jobs for fitting multiple models in a clever, model-specific way
** This is what is used by Deep Learning Pipelines and what we'd like to do
more of in the future.
** This use case is primarily for _large_ Spark jobs. E.g., for DLP, the Spark
job includes a bunch of tasks, and each task is sizable since it fits a model
in Keras.
Assuming we can reasonably divide the use cases of this fitMultiple() API into
these 2 types, then we don't need to worry about dependencies between
Callables. We only need to worry about dependencies when users use parallelism
> 1 with the 2nd type of use case, which we can advise against in the
documentation for the parallelism Param.
What do you think?
> Fix model-specific optimization support for ML tuning
> -----------------------------------------------------
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
> Issue Type: Improvement
> Components: ML
> Affects Versions: 2.3.0
> Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in
> SPARK-19357
> more discussion is here
> https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
> It could be possible to still use the current parallelism and still allow
> for model-specific optimizations. For example, if we doing cross validation
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets
> say that the cross validator could know that maxIter is optimized for the
> model being evaluated (e.g. a new method in Estimator that return such
> params). It would then be straightforward for the cross validator to remove
> maxIter from the param map that will be parallelized over and use it to
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1,
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1,
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3,
> maxIter=10)) in another thread. In this example, there're 4 paramMaps, but
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap
> index for corresponding model. Use the example above, there're 4 paramMaps,
> but only return 2 callable objects, one callable object for ((regParam=0.1,
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3,
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
> paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
> override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
> }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
> fitCallables(dataset, paramMaps).map { _.call().toSeq }
> .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
> val trainingDataset = sparkSession.createDataFrame(training,
> schema).cache()
> val validationDataset = sparkSession.createDataFrame(validation,
> schema).cache()
> // Fit models in a Future for training in parallel
> val modelMapFutures = fitCallables(trainingDataset, paramMaps).map {
> callable =>
> Future[Map[Int, Model[_]]] {
> val modelMap = callable.call()
> if (collectSubModelsParam) {
> ...
> }
> modelMap
> } (executionContext)
> }
> // Unpersist training data only when all models have trained
> Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly,
> executionContext)
> .onComplete { _ => trainingDataset.unpersist() } (executionContext)
> // Evaluate models in a Future that will calulate a metric and allow
> model to be cleaned up
> val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
> modelMapFuture.map { modelMap =>
> modelMap.map { case (index: Int, model: Model[_]) =>
> val metric = eval.evaluate(model.transform(validationDataset,
> paramMaps(index)))
> (index, metric)
> }
> } (executionContext)
> }
> // Wait for metrics to be calculated before unpersisting validation
> dataset
> val foldMetrics = foldMetricMapFutures.map(ThreadUtils.awaitResult(_,
> Duration.Inf))
> .map(_.toSeq).sortBy(_._1).map(_._2)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]