Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/692#discussion_r30780392
  
    --- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
    @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
         }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
       }
     
    +
    +
       /** Provides a solution for the given optimization problem
         *
         * @param data A Dataset of LabeledVector (label, features) pairs
    -    * @param initWeights The initial weights that will be optimized
    +    * @param initialWeights The initial weights that will be optimized
         * @return The weights, optimized for the provided data.
         */
       override def optimize(
         data: DataSet[LabeledVector],
    -    initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
    -    // TODO: Faster way to do this?
    -    val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
    -
    -    val numberOfIterations: Int = parameterMap(Iterations)
    +    initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
    +    val numberOfIterations: Int = parameters(Iterations)
    +    // TODO(tvas): This looks out of place, why don't we get back an 
Option from
    +    // parameters(ConvergenceThreshold)?
    +    val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
     
         // Initialize weights
    -    val initialWeightsDS: DataSet[WeightVector] = initWeights match {
    -      // Ensure provided weight vector is a DenseVector
    -      case Some(wvDS) => {
    -        wvDS.map{wv => {
    -          val denseWeights = wv.weights match {
    -            case dv: DenseVector => dv
    -            case sv: SparseVector => sv.toDenseVector
    +    val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
    +
    +    // Perform the iterations
    +    val optimizedWeights = convergenceThresholdOption match {
    +      // No convergence criterion
    +      case None =>
    +        initialWeightsDS.iterate(numberOfIterations) {
    +          weightVectorDS => {
    +            SGDStep(data, weightVectorDS)
               }
    -          WeightVector(denseWeights, wv.intercept)
             }
    -
    +      case Some(convergence) =>
    +        /** Calculates the regularized loss, from the data and given 
weights **/
    +        def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
    +        DataSet[Double] = {
    +          data.map {
    +            new LossCalculation
    +          }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
    +            .reduce {
    +            (left, right) =>
    +              val (leftLoss, leftCount) = left
    +              val (rightLoss, rightCount) = right
    +              (leftLoss + rightLoss, rightCount + leftCount)
    +          }
    +            .map{new RegularizedLossCalculation}
    +            .withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
             }
    -      }
    -      case None => createInitialWeightVector(dimensionsDS)
    -    }
    -
    -    // Perform the iterations
    -    // TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
    -    initialWeightsDS.iterate(numberOfIterations) {
    -      weightVector => {
    -        SGDStep(data, weightVector)
    -      }
    +        // We have to calculate for each weight vector the sum of squared 
residuals,
    +        // and then sum them and apply regularization
    +        val initialLossSumDS = lossCalculation(data, initialWeightsDS)
    +
    +        // Combine weight vector with the current loss
    +        val initialWeightsWithLossSum = initialWeightsDS.
    +          crossWithTiny(initialLossSumDS).setParallelism(1)
    +
    +        val resultWithLoss = initialWeightsWithLossSum.
    +          iterateWithTermination(numberOfIterations) {
    +          weightsWithLossSum =>
    +
    +            // Extract weight vector and loss
    +            val previousWeightsDS = weightsWithLossSum.map{_._1}
    +            val previousLossSumDS = weightsWithLossSum.map{_._2}
    +
    +            val currentWeightsDS = SGDStep(data, previousWeightsDS)
    +
    +            val currentLossSumDS = lossCalculation(data, currentWeightsDS)
    +
    +            // Check if the relative change in the loss is smaller than the
    +            // convergence threshold. If yes, then terminate i.e. return 
empty termination data set
    +            val termination = 
previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
    +              filter{
    +              pair => {
    +                val (previousLoss, currentLoss) = pair
    +
    +                if (previousLoss <= 0) {
    +                  false
    +                } else {
    +                  math.abs((previousLoss - currentLoss)/previousLoss) >= 
convergence
    --- End diff --
    
    Can we also support different convergence criterion? For example, the 
absolute loss?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to