[
https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14510960#comment-14510960
]
ASF GitHub Bot commented on FLINK-1807:
---------------------------------------
Github user thvasilo commented on a diff in the pull request:
https://github.com/apache/flink/pull/613#discussion_r29044450
--- Diff:
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.common.{LabeledVector, WeightVector,
ParameterMap}
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.ml.regression.RegressionData._
+import org.scalatest.{Matchers, FlatSpec}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+
+class GradientDescentITSuite extends FlatSpec with Matchers with
FlinkTestBase {
+
+ behavior of "The Stochastic Gradient Descent implementation"
+
+ it should "correctly solve an L1 regularized regression problem" in {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ env.setParallelism(2)
+
+ val parameters = ParameterMap()
+
+ parameters.add(IterativeSolver.Stepsize, 0.01)
+ parameters.add(IterativeSolver.Iterations, 2000)
+ parameters.add(Solver.LossFunction, new SquaredLoss)
+ parameters.add(Solver.RegularizationType, new L1Regularization)
+ parameters.add(Solver.RegularizationParameter, 0.3)
+
+ val sgd = GradientDescent(parameters)
+
+ val inputDS: DataSet[LabeledVector] =
env.fromCollection(regularizationData)
+
+ val weightDS = sgd.optimize(inputDS, None)
+
+ val weightList: Seq[WeightVector] = weightDS.collect()
+
+ val weightVector: WeightVector = weightList.head
+
+ val intercept = weightVector.intercept
+ val weights = weightVector.weights.asInstanceOf[DenseVector].data
+
+ expectedRegWeights zip weights foreach {
+ case (expectedWeight, weight) =>
+ weight should be (expectedWeight +- 0.01)
+ }
+
+ intercept should be (expectedRegWeight0 +- 0.1)
+ }
+
+ it should "correctly perform one step with L2 regularization" in {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ env.setParallelism(2)
+
+ val parameters = ParameterMap()
+
+ parameters.add(IterativeSolver.Stepsize, 0.1)
+ parameters.add(IterativeSolver.Iterations, 1)
+ parameters.add(Solver.LossFunction, new SquaredLoss)
+ parameters.add(Solver.RegularizationType, new L2Regularization)
+ parameters.add(Solver.RegularizationParameter, 1.0)
+
+ val sgd = GradientDescent(parameters)
+
+ val inputDS: DataSet[LabeledVector] =
env.fromElements(LabeledVector(1.0, DenseVector(2.0)))
+ val currentWeights = new WeightVector(DenseVector(1.0), 1.0)
+ val currentWeightsDS = env.fromElements(currentWeights)
+
+ val weightDS = sgd.optimize(inputDS, Some(currentWeightsDS))
+
+ val weightList: Seq[WeightVector] = weightDS.collect()
+
+ weightList.size should equal(1)
+
+ val weightVector: WeightVector = weightList.head
+
+ val updatedIntercept = weightVector.intercept
+ val updatedWeight = weightVector.weights(0)
+
+ updatedWeight should be (0.54 +- 0.001)
+ updatedIntercept should be (0.8 +- 0.01)
+ }
+
+ it should "estimate a linear function" in {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ env.setParallelism(2)
+
+ val parameters = ParameterMap()
+
+ parameters.add(IterativeSolver.Stepsize, 1.0)
+ parameters.add(IterativeSolver.Iterations, 100)
+ parameters.add(Solver.LossFunction, new SquaredLoss)
+ parameters.add(Solver.RegularizationType, new NoRegularization)
+ parameters.add(Solver.RegularizationParameter, 0.0)
+
+ val sgd = GradientDescent(parameters)
+
+ val inputDS = env.fromCollection(data)
+ val weightDS = sgd.optimize(inputDS, None)
+
+ val weightList: Seq[WeightVector] = weightDS.collect()
+
+ weightList.size should equal(1)
+
+ val weightVector: WeightVector = weightList.head
+
+ // TODO(tvas): Can we do without the explicit conversion?
+ val weights = weightVector.weights.asInstanceOf[DenseVector].data
+ val weight0 = weightVector.intercept
+
+
+ expectedWeights zip weights foreach {
+ case (expectedWeight, weight) =>
+ weight should be (expectedWeight +- 1)
+ }
+ weight0 should be (expectedWeight0 +- 0.4)
+ }
+
+ it should "estimate a linear function without an intercept" in {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ env.setParallelism(2)
+
+ val parameters = ParameterMap()
+
+ parameters.add(IterativeSolver.Stepsize, 0.0001)
+ parameters.add(IterativeSolver.Iterations, 100)
+ parameters.add(Solver.LossFunction, new SquaredLoss)
+ parameters.add(Solver.RegularizationType, new NoRegularization)
+ parameters.add(Solver.RegularizationParameter, 0.0)
+
+ val sgd = GradientDescent(parameters)
+
+ val inputDS = env.fromCollection(noInterceptData)
+ val weightDS = sgd.optimize(inputDS, None)
+
+ val weightList: Seq[WeightVector] = weightDS.collect()
+
+ weightList.size should equal(1)
+
+ val weightVector: WeightVector = weightList.head
+
+ // TODO(tvas): Can we do without the explicit conversion?
+ val weights = weightVector.weights.asInstanceOf[DenseVector].data
+ val weight0 = weightVector.intercept
+
+ expectedNoInterceptWeights zip weights foreach {
+ case (expectedWeight, weight) =>
+ weight should be (expectedWeight +- 1)
+ }
+ weight0 should be (expectedNoInterceptWeight0 +- 0.4)
+ }
+
+ it should "correctly perform one step of the algorithm with initial
weights provided" in {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ env.setParallelism(2)
+
+ val parameters = ParameterMap()
+
+ parameters.add(IterativeSolver.Stepsize, 0.1)
+ parameters.add(IterativeSolver.Iterations, 1)
+ parameters.add(Solver.LossFunction, new SquaredLoss)
+ parameters.add(Solver.RegularizationType, new NoRegularization)
+ parameters.add(Solver.RegularizationParameter, 0.0)
+
+ val sgd = GradientDescent(parameters)
+
+ val inputDS: DataSet[LabeledVector] =
env.fromElements(LabeledVector(1.0, DenseVector(2.0)))
+ val currentWeights = new WeightVector(DenseVector(1.0), 1.0)
+ val currentWeightsDS = env.fromElements(currentWeights)
+
+ val weightDS = sgd.optimize(inputDS, Some(currentWeightsDS))
+
+ val weightList: Seq[WeightVector] = weightDS.collect()
+
+ weightList.size should equal(1)
+
+ val weightVector: WeightVector = weightList.head
+
+ val updatedIntercept = weightVector.intercept
+ val updatedWeight = weightVector.weights(0)
+
+ updatedWeight should be (0.6 +- 0.01)
+ updatedIntercept should be (0.8 +- 0.01)
+
+ }
+
+ // TODO: Add L1 test
--- End diff --
Remove, L1 test added.
> Stochastic gradient descent optimizer for ML library
> ----------------------------------------------------
>
> Key: FLINK-1807
> URL: https://issues.apache.org/jira/browse/FLINK-1807
> Project: Flink
> Issue Type: Improvement
> Components: Machine Learning Library
> Reporter: Till Rohrmann
> Assignee: Theodore Vasiloudis
> Labels: ML
>
> Stochastic gradient descent (SGD) is a widely used optimization technique in
> different ML algorithms. Thus, it would be helpful to provide a generalized
> SGD implementation which can be instantiated with the respective gradient
> computation. Such a building block would make the development of future
> algorithms easier.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)