Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/692#discussion_r30780614
  
    --- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
    @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
         }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
       }
     
    +
    +
       /** Provides a solution for the given optimization problem
         *
         * @param data A Dataset of LabeledVector (label, features) pairs
    -    * @param initWeights The initial weights that will be optimized
    +    * @param initialWeights The initial weights that will be optimized
         * @return The weights, optimized for the provided data.
         */
       override def optimize(
         data: DataSet[LabeledVector],
    -    initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
    -    // TODO: Faster way to do this?
    -    val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
    -
    -    val numberOfIterations: Int = parameterMap(Iterations)
    +    initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
    +    val numberOfIterations: Int = parameters(Iterations)
    +    // TODO(tvas): This looks out of place, why don't we get back an 
Option from
    +    // parameters(ConvergenceThreshold)?
    +    val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
     
         // Initialize weights
    -    val initialWeightsDS: DataSet[WeightVector] = initWeights match {
    -      // Ensure provided weight vector is a DenseVector
    -      case Some(wvDS) => {
    -        wvDS.map{wv => {
    -          val denseWeights = wv.weights match {
    -            case dv: DenseVector => dv
    -            case sv: SparseVector => sv.toDenseVector
    +    val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
    +
    +    // Perform the iterations
    +    val optimizedWeights = convergenceThresholdOption match {
    +      // No convergence criterion
    +      case None =>
    +        initialWeightsDS.iterate(numberOfIterations) {
    +          weightVectorDS => {
    +            SGDStep(data, weightVectorDS)
               }
    -          WeightVector(denseWeights, wv.intercept)
             }
    -
    +      case Some(convergence) =>
    +        /** Calculates the regularized loss, from the data and given 
weights **/
    +        def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
    +        DataSet[Double] = {
    +          data.map {
    +            new LossCalculation
    +          }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
    +            .reduce {
    +            (left, right) =>
    +              val (leftLoss, leftCount) = left
    +              val (rightLoss, rightCount) = right
    +              (leftLoss + rightLoss, rightCount + leftCount)
    +          }
    +            .map{new RegularizedLossCalculation}
    +            .withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
             }
    -      }
    -      case None => createInitialWeightVector(dimensionsDS)
    -    }
    -
    -    // Perform the iterations
    -    // TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
    -    initialWeightsDS.iterate(numberOfIterations) {
    -      weightVector => {
    -        SGDStep(data, weightVector)
    -      }
    +        // We have to calculate for each weight vector the sum of squared 
residuals,
    +        // and then sum them and apply regularization
    +        val initialLossSumDS = lossCalculation(data, initialWeightsDS)
    +
    +        // Combine weight vector with the current loss
    +        val initialWeightsWithLossSum = initialWeightsDS.
    +          crossWithTiny(initialLossSumDS).setParallelism(1)
    +
    +        val resultWithLoss = initialWeightsWithLossSum.
    +          iterateWithTermination(numberOfIterations) {
    +          weightsWithLossSum =>
    +
    +            // Extract weight vector and loss
    +            val previousWeightsDS = weightsWithLossSum.map{_._1}
    +            val previousLossSumDS = weightsWithLossSum.map{_._2}
    +
    +            val currentWeightsDS = SGDStep(data, previousWeightsDS)
    +
    +            val currentLossSumDS = lossCalculation(data, currentWeightsDS)
    +
    +            // Check if the relative change in the loss is smaller than the
    +            // convergence threshold. If yes, then terminate i.e. return 
empty termination data set
    +            val termination = 
previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
    +              filter{
    +              pair => {
    +                val (previousLoss, currentLoss) = pair
    +
    +                if (previousLoss <= 0) {
    +                  false
    +                } else {
    +                  math.abs((previousLoss - currentLoss)/previousLoss) >= 
convergence
    +                }
    +              }
    +            }
    +
    +            // Result for new iteration
    +            (currentWeightsDS cross currentLossSumDS, termination)
    +        }
    +        // Return just the weights
    +        resultWithLoss.map{_._1}
         }
    +    optimizedWeights
       }
     
    -  /** Mapping function that calculates the weight gradients from the data.
    +  /** Calculates the loss value, given a labeled vector and the current 
weight vector
         *
    +    * The weight vector is received as a broadcast variable.
         */
    -  private class GradientCalculation extends
    -    RichMapFunction[LabeledVector, (WeightVector, Double, Int)] {
    +  private class LossCalculation extends RichMapFunction[LabeledVector, 
(Double, Int)] {
     
         var weightVector: WeightVector = null
     
    +
         @throws(classOf[Exception])
         override def open(configuration: Configuration): Unit = {
           val list = this.getRuntimeContext.
             getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST)
     
           weightVector = list.get(0)
    -    }
     
    -    override def map(example: LabeledVector): (WeightVector, Double, Int) 
= {
    +    }
     
    -      val lossFunction = parameterMap(LossFunction)
    -      val regType = parameterMap(RegularizationType)
    -      val regParameter = parameterMap(RegularizationParameter)
    -      val predictionFunction = parameterMap(PredictionFunctionParameter)
    +    override def map(example: LabeledVector): (Double, Int) = {
    +      val lossFunction = parameters(LossFunctionParameter)
    +      val predictionFunction = parameters(PredictionFunctionParameter)
           val dimensions = example.vector.size
    -      // TODO(tvas): Any point in carrying the weightGradient vector for 
in-place replacement?
    -      // The idea in spark is to avoid object creation, but here we have 
to do it anyway
    +      // TODO(tvas): Avoid needless creation of WeightGradient object
    +      // Create a lossValue function in LossFunction?
           val weightGradient = new DenseVector(new Array[Double](dimensions))
     
    -      // TODO(tvas): Indentation here?
    -      val (loss, lossDeriv) = lossFunction.lossAndGradient(
    -                                example,
    -                                weightVector,
    -                                weightGradient,
    -                                regType,
    -                                regParameter,
    -                                predictionFunction)
    +      val (loss, _) = lossFunction.lossAndGradient(
    +        example,
    +        weightVector,
    +        weightGradient,
    +        predictionFunction)
     
    -      (new WeightVector(weightGradient, lossDeriv), loss, 1)
    +      (loss, 1)
         }
       }
     
    +/** Calculates the regularized loss value, given the loss and the current 
weight vector
    +  *
    +  * The weight vector is received as a broadcast variable.
    +  */
    +private class RegularizedLossCalculation extends RichMapFunction[(Double, 
Int), Double] {
    +
    +  var weightVector: WeightVector = null
    +
    +
    --- End diff --
    
    two line breaks intended?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to