[
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553971#comment-14553971
]
ASF GitHub Bot commented on FLINK-1992:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/692#discussion_r30786437
--- Diff:
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap)
extends IterativeSolver {
}.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
}
+
+
/** Provides a solution for the given optimization problem
*
* @param data A Dataset of LabeledVector (label, features) pairs
- * @param initWeights The initial weights that will be optimized
+ * @param initialWeights The initial weights that will be optimized
* @return The weights, optimized for the provided data.
*/
override def optimize(
data: DataSet[LabeledVector],
- initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
- // TODO: Faster way to do this?
- val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
-
- val numberOfIterations: Int = parameterMap(Iterations)
+ initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector]
= {
+ val numberOfIterations: Int = parameters(Iterations)
+ // TODO(tvas): This looks out of place, why don't we get back an
Option from
+ // parameters(ConvergenceThreshold)?
+ val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
// Initialize weights
- val initialWeightsDS: DataSet[WeightVector] = initWeights match {
- // Ensure provided weight vector is a DenseVector
- case Some(wvDS) => {
- wvDS.map{wv => {
- val denseWeights = wv.weights match {
- case dv: DenseVector => dv
- case sv: SparseVector => sv.toDenseVector
+ val initialWeightsDS: DataSet[WeightVector] =
createInitialWeightsDS(initialWeights, data)
+
+ // Perform the iterations
+ val optimizedWeights = convergenceThresholdOption match {
+ // No convergence criterion
+ case None =>
+ initialWeightsDS.iterate(numberOfIterations) {
+ weightVectorDS => {
+ SGDStep(data, weightVectorDS)
}
- WeightVector(denseWeights, wv.intercept)
}
-
+ case Some(convergence) =>
+ /** Calculates the regularized loss, from the data and given
weights **/
--- End diff --
That is ScalaDoc what you've written. Usually this is only used for
classes/objects/packages/methods but not method body comments.
> Add convergence criterion to SGD optimizer
> ------------------------------------------
>
> Key: FLINK-1992
> URL: https://issues.apache.org/jira/browse/FLINK-1992
> Project: Flink
> Issue Type: Improvement
> Components: Machine Learning Library
> Reporter: Till Rohrmann
> Assignee: Theodore Vasiloudis
> Priority: Minor
> Labels: ML
> Fix For: 0.9
>
>
> Currently, Flink's SGD optimizer runs for a fixed number of iterations. It
> would be good to support a dynamic convergence criterion, too.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)