[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-22126:
-------------------------------
    Description: 
Fix model-specific optimization support for ML tuning. This is discussed in 
SPARK-19357
more discussion is here
 https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0

Anyone who's following might want to scan the design doc (in the links above), 
the latest api proposal is:
{code}
def fitMultiple(
    dataset: Dataset[_],
    paramMaps: Array[ParamMap]
  ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
{code}

Old discussion:

I copy discussion from gist to here:

I propose to design API as:
{code}
def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
Array[Callable[Map[Int, M]]]
{code}

Let me use an example to explain the API:
{quote}
 It could be possible to still use the current parallelism and still allow for 
model-specific optimizations. For example, if we doing cross validation and 
have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets say 
that the cross validator could know that maxIter is optimized for the model 
being evaluated (e.g. a new method in Estimator that return such params). It 
would then be straightforward for the cross validator to remove maxIter from 
the param map that will be parallelized over and use it to create 2 arrays of 
paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, maxIter=10)) and 
((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
{quote}
In this example, we can see that, models computed from ((regParam=0.1, 
maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
maxIter=10))  in another thread. In this example, there're 4 paramMaps, but we 
can at most generate two threads to compute the models for them.

The API above allow "callable.call()" to return multiple models, and return 
type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
index for corresponding model. Use the example above, there're 4 paramMaps, but 
only return 2 callable objects, one callable object for ((regParam=0.1, 
maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
maxIter=5), (regParam=0.3, maxIter=10)).

and the default "fitCallables/fit with paramMaps" can be implemented as 
following:
{code}
def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
    Array[Callable[Map[Int, M]]] = {
  paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
    new Callable[Map[Int, M]] {
      override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
    }
  }
}
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
   fitCallables(dataset, paramMaps).map { _.call().toSeq }
     .flatMap(_).sortBy(_._1).map(_._2)
}
{code}
If use the API I proposed above, the code in 
[CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
can be changed to:
{code}
      val trainingDataset = sparkSession.createDataFrame(training, 
schema).cache()
      val validationDataset = sparkSession.createDataFrame(validation, 
schema).cache()

      // Fit models in a Future for training in parallel
      val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
callable =>
         Future[Map[Int, Model[_]]] {
            val modelMap = callable.call()
            if (collectSubModelsParam) {
               ...
            }
            modelMap
         } (executionContext)
      }

      // Unpersist training data only when all models have trained
      Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, 
executionContext)
        .onComplete { _ => trainingDataset.unpersist() } (executionContext)

      // Evaluate models in a Future that will calulate a metric and allow 
model to be cleaned up
      val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
        modelMapFuture.map { modelMap =>
          modelMap.map { case (index: Int, model: Model[_]) =>
            val metric = eval.evaluate(model.transform(validationDataset, 
paramMaps(index)))
            (index, metric)
          }
        } (executionContext)
      }

      // Wait for metrics to be calculated before unpersisting validation 
dataset
      val foldMetrics = foldMetricMapFutures.map(ThreadUtils.awaitResult(_, 
Duration.Inf))
          .map(_.toSeq).sortBy(_._1).map(_._2)
{code}


  was:
Fix model-specific optimization support for ML tuning. This is discussed in 
SPARK-19357
more discussion is here
 https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0

I copy discussion from gist to here:

I propose to design API as:
{code}
def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
Array[Callable[Map[Int, M]]]
{code}

Let me use an example to explain the API:
{quote}
 It could be possible to still use the current parallelism and still allow for 
model-specific optimizations. For example, if we doing cross validation and 
have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets say 
that the cross validator could know that maxIter is optimized for the model 
being evaluated (e.g. a new method in Estimator that return such params). It 
would then be straightforward for the cross validator to remove maxIter from 
the param map that will be parallelized over and use it to create 2 arrays of 
paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, maxIter=10)) and 
((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
{quote}
In this example, we can see that, models computed from ((regParam=0.1, 
maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
maxIter=10))  in another thread. In this example, there're 4 paramMaps, but we 
can at most generate two threads to compute the models for them.

The API above allow "callable.call()" to return multiple models, and return 
type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
index for corresponding model. Use the example above, there're 4 paramMaps, but 
only return 2 callable objects, one callable object for ((regParam=0.1, 
maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
maxIter=5), (regParam=0.3, maxIter=10)).

and the default "fitCallables/fit with paramMaps" can be implemented as 
following:
{code}
def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
    Array[Callable[Map[Int, M]]] = {
  paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
    new Callable[Map[Int, M]] {
      override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
    }
  }
}
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
   fitCallables(dataset, paramMaps).map { _.call().toSeq }
     .flatMap(_).sortBy(_._1).map(_._2)
}
{code}
If use the API I proposed above, the code in 
[CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
can be changed to:
{code}
      val trainingDataset = sparkSession.createDataFrame(training, 
schema).cache()
      val validationDataset = sparkSession.createDataFrame(validation, 
schema).cache()

      // Fit models in a Future for training in parallel
      val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
callable =>
         Future[Map[Int, Model[_]]] {
            val modelMap = callable.call()
            if (collectSubModelsParam) {
               ...
            }
            modelMap
         } (executionContext)
      }

      // Unpersist training data only when all models have trained
      Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, 
executionContext)
        .onComplete { _ => trainingDataset.unpersist() } (executionContext)

      // Evaluate models in a Future that will calulate a metric and allow 
model to be cleaned up
      val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
        modelMapFuture.map { modelMap =>
          modelMap.map { case (index: Int, model: Model[_]) =>
            val metric = eval.evaluate(model.transform(validationDataset, 
paramMaps(index)))
            (index, metric)
          }
        } (executionContext)
      }

      // Wait for metrics to be calculated before unpersisting validation 
dataset
      val foldMetrics = foldMetricMapFutures.map(ThreadUtils.awaitResult(_, 
Duration.Inf))
          .map(_.toSeq).sortBy(_._1).map(_._2)
{code}



> Fix model-specific optimization support for ML tuning
> -----------------------------------------------------
>
>                 Key: SPARK-22126
>                 URL: https://issues.apache.org/jira/browse/SPARK-22126
>             Project: Spark
>          Issue Type: Improvement
>          Components: ML
>    Affects Versions: 2.3.0
>            Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> Anyone who's following might want to scan the design doc (in the links 
> above), the latest api proposal is:
> {code}
> def fitMultiple(
>     dataset: Dataset[_],
>     paramMaps: Array[ParamMap]
>   ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
> {code}
> Old discussion:
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
>     Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
>     new Callable[Map[Int, M]] {
>       override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
>     }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>    fitCallables(dataset, paramMaps).map { _.call().toSeq }
>      .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>       val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>       val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>       // Fit models in a Future for training in parallel
>       val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
> callable =>
>          Future[Map[Int, Model[_]]] {
>             val modelMap = callable.call()
>             if (collectSubModelsParam) {
>                ...
>             }
>             modelMap
>          } (executionContext)
>       }
>       // Unpersist training data only when all models have trained
>       Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, 
> executionContext)
>         .onComplete { _ => trainingDataset.unpersist() } (executionContext)
>       // Evaluate models in a Future that will calulate a metric and allow 
> model to be cleaned up
>       val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
>         modelMapFuture.map { modelMap =>
>           modelMap.map { case (index: Int, model: Model[_]) =>
>             val metric = eval.evaluate(model.transform(validationDataset, 
> paramMaps(index)))
>             (index, metric)
>           }
>         } (executionContext)
>       }
>       // Wait for metrics to be calculated before unpersisting validation 
> dataset
>       val foldMetrics = foldMetricMapFutures.map(ThreadUtils.awaitResult(_, 
> Duration.Inf))
>           .map(_.toSeq).sortBy(_._1).map(_._2)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to