[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553820#comment-14553820
 ] 

ASF GitHub Bot commented on FLINK-1992:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/692#discussion_r30780043
  
    --- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
    @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
         }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
       }
     
    +
    +
       /** Provides a solution for the given optimization problem
         *
         * @param data A Dataset of LabeledVector (label, features) pairs
    -    * @param initWeights The initial weights that will be optimized
    +    * @param initialWeights The initial weights that will be optimized
         * @return The weights, optimized for the provided data.
         */
       override def optimize(
         data: DataSet[LabeledVector],
    -    initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
    -    // TODO: Faster way to do this?
    -    val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
    -
    -    val numberOfIterations: Int = parameterMap(Iterations)
    +    initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
    +    val numberOfIterations: Int = parameters(Iterations)
    +    // TODO(tvas): This looks out of place, why don't we get back an 
Option from
    +    // parameters(ConvergenceThreshold)?
    +    val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
     
         // Initialize weights
    -    val initialWeightsDS: DataSet[WeightVector] = initWeights match {
    -      // Ensure provided weight vector is a DenseVector
    -      case Some(wvDS) => {
    -        wvDS.map{wv => {
    -          val denseWeights = wv.weights match {
    -            case dv: DenseVector => dv
    -            case sv: SparseVector => sv.toDenseVector
    +    val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
    +
    +    // Perform the iterations
    +    val optimizedWeights = convergenceThresholdOption match {
    +      // No convergence criterion
    +      case None =>
    +        initialWeightsDS.iterate(numberOfIterations) {
    +          weightVectorDS => {
    +            SGDStep(data, weightVectorDS)
               }
    -          WeightVector(denseWeights, wv.intercept)
             }
    -
    +      case Some(convergence) =>
    +        /** Calculates the regularized loss, from the data and given 
weights **/
    --- End diff --
    
    One line comments should use `//`


> Add convergence criterion to SGD optimizer
> ------------------------------------------
>
>                 Key: FLINK-1992
>                 URL: https://issues.apache.org/jira/browse/FLINK-1992
>             Project: Flink
>          Issue Type: Improvement
>          Components: Machine Learning Library
>            Reporter: Till Rohrmann
>            Assignee: Theodore Vasiloudis
>            Priority: Minor
>              Labels: ML
>             Fix For: 0.9
>
>
> Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
> would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to