Repository: flink
Updated Branches:
  refs/heads/master c0b297573 -> b8aa49ce7


[FLINK-1794] [test-utils] Adds test base for scala tests and adapts existing 
flink-ml tests

[FLINK-1794] [test-utils] Adds scala docs to FlinkTestBase

This closes #540.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8aa49ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8aa49ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8aa49ce

Branch: refs/heads/master
Commit: b8aa49ce7d806c52183ab79c17c6024b60d0eaaf
Parents: c0b2975
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Mar 27 19:40:42 2015 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Mar 31 13:40:53 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java |   2 +-
 flink-staging/flink-ml/pom.xml                  |  25 ++-
 .../flink/ml/feature/PolynomialBaseITCase.scala | 132 ----------------
 .../ml/feature/PolynomialBaseITSuite.scala      | 126 +++++++++++++++
 .../flink/ml/recommendation/ALSITCase.scala     | 153 -------------------
 .../flink/ml/recommendation/ALSITSuite.scala    | 149 ++++++++++++++++++
 .../MultipleLinearRegressionITCase.scala        | 115 --------------
 .../MultipleLinearRegressionITSuite.scala       | 109 +++++++++++++
 flink-test-utils/pom.xml                        |   6 +
 .../apache/flink/test/util/FlinkTestBase.scala  |  68 +++++++++
 10 files changed, 479 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 6dff9e2..dc1a7ab 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -70,7 +70,7 @@ public class Client {
        
        private final Optimizer compiler;               // the compiler to 
compile the jobs
        
-       private boolean printStatusDuringExecution = false;
+       private boolean printStatusDuringExecution = true;
 
        /**
         * If != -1, this field specifies the total number of available slots 
on the cluster

http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/pom.xml b/flink-staging/flink-ml/pom.xml
index 123770f..4f251e5 100644
--- a/flink-staging/flink-ml/pom.xml
+++ b/flink-staging/flink-ml/pom.xml
@@ -41,11 +41,6 @@
                </dependency>
 
                <dependency>
-                       <groupId>org.scalatest</groupId>
-                       
<artifactId>scalatest_${scala.binary.version}</artifactId>
-               </dependency>
-
-               <dependency>
                        <groupId>com.github.fommil.netlib</groupId>
                        <artifactId>core</artifactId>
                        <version>1.1.2</version>
@@ -65,6 +60,13 @@
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
@@ -105,6 +107,19 @@
                                                <goals>
                                                        <goal>test</goal>
                                                </goals>
+                                               <configuration>
+                                                       
<suffixes>(?&lt;!(IT|Integration))(Test|Suite|Case)</suffixes>
+                                               </configuration>
+                                       </execution>
+                                       <execution>
+                                               <id>integration-test</id>
+                                               <phase>integration-test</phase>
+                                               <goals>
+                                                       <goal>test</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<suffixes>(IT|Integration)(Test|Suite|Case)</suffixes>
+                                               </configuration>
                                        </execution>
                                </executions>
                        </plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
deleted file mode 100644
index 95e2a25..0000000
--- 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.feature
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.client.CliFrontendTestUtils
-import org.apache.flink.ml.common.LabeledVector
-import org.apache.flink.ml.math.DenseVector
-import org.junit.{BeforeClass, Test}
-import org.scalatest.ShouldMatchers
-
-import org.apache.flink.api.scala._
-
-class PolynomialBaseITCase extends ShouldMatchers {
-
-  @Test
-  def testMapElementToPolynomialVectorSpace (): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    env.setParallelism (2)
-
-    val input = Seq (
-    LabeledVector (DenseVector (1), 1.0),
-    LabeledVector (DenseVector (2), 2.0)
-    )
-
-    val inputDS = env.fromCollection (input)
-
-    val transformer = PolynomialBase ()
-    .setDegree (3)
-
-    val transformedDS = transformer.transform (inputDS)
-
-    val expectedMap = List (
-    (1.0 -> DenseVector (1.0, 1.0, 1.0) ),
-    (2.0 -> DenseVector (8.0, 4.0, 2.0) )
-    ) toMap
-
-    val result = transformedDS.collect
-
-    for (entry <- result) {
-    expectedMap.contains (entry.label) should be (true)
-    entry.vector should equal (expectedMap (entry.label) )
-    }
-  }
-
-  @Test
-  def testMapVectorToPolynomialVectorSpace(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    env.setParallelism(2)
-
-    val input = Seq(
-      LabeledVector(DenseVector(2, 3), 1.0),
-      LabeledVector(DenseVector(2, 3, 4), 2.0)
-    )
-
-    val expectedMap = List(
-      (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)),
-      (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 
64.0, 4.0, 6.0, 8.0,
-        9.0, 12.0, 16.0, 2.0, 3.0, 4.0))
-    ) toMap
-
-    val inputDS = env.fromCollection(input)
-
-    val transformer = PolynomialBase()
-      .setDegree(3)
-
-    val transformedDS = transformer.transform(inputDS)
-
-    val result = transformedDS.collect
-
-    for(entry <- result) {
-      expectedMap.contains(entry.label) should be(true)
-      entry.vector should equal(expectedMap(entry.label))
-    }
-  }
-
-  @Test
-  def testReturnEmptyVectorIfDegreeIsZero(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    env.setParallelism(2)
-
-    val input = Seq(
-      LabeledVector(DenseVector(2, 3), 1.0),
-      LabeledVector(DenseVector(2, 3, 4), 2.0)
-    )
-
-    val inputDS = env.fromCollection(input)
-
-    val transformer = PolynomialBase()
-      .setDegree(0)
-
-    val transformedDS = transformer.transform(inputDS)
-
-    val result = transformedDS.collect
-
-    val expectedMap = List(
-      (1.0 -> DenseVector()),
-      (2.0 -> DenseVector())
-    ) toMap
-
-    for(entry <- result) {
-      expectedMap.contains(entry.label) should be(true)
-      entry.vector should equal(expectedMap(entry.label))
-    }
-  }
-}
-
-object PolynomialBaseITCase {
-  @BeforeClass
-  def setup(): Unit = {
-    CliFrontendTestUtils.pipeSystemOutToNull()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
new file mode 100644
index 0000000..5529e17
--- /dev/null
+++ 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.DenseVector
+import org.scalatest.{Matchers, FlatSpec}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+class PolynomialBaseITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "The polynomial base implementation"
+
+  it should "map single element vectors to the polynomial vector space" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism (2)
+
+    val input = Seq (
+    LabeledVector (DenseVector (1), 1.0),
+    LabeledVector (DenseVector (2), 2.0)
+    )
+
+    val inputDS = env.fromCollection (input)
+
+    val transformer = PolynomialBase ()
+    .setDegree (3)
+
+    val transformedDS = transformer.transform (inputDS)
+
+    val expectedMap = List (
+    (1.0 -> DenseVector (1.0, 1.0, 1.0) ),
+    (2.0 -> DenseVector (8.0, 4.0, 2.0) )
+    ) toMap
+
+    val result = transformedDS.collect
+
+    for (entry <- result) {
+    expectedMap.contains (entry.label) should be (true)
+    entry.vector should equal (expectedMap (entry.label) )
+    }
+  }
+
+  it should "map vectors to the polynomial vector space" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val input = Seq(
+      LabeledVector(DenseVector(2, 3), 1.0),
+      LabeledVector(DenseVector(2, 3, 4), 2.0)
+    )
+
+    val expectedMap = List(
+      (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)),
+      (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 
64.0, 4.0, 6.0, 8.0,
+        9.0, 12.0, 16.0, 2.0, 3.0, 4.0))
+    ) toMap
+
+    val inputDS = env.fromCollection(input)
+
+    val transformer = PolynomialBase()
+      .setDegree(3)
+
+    val transformedDS = transformer.transform(inputDS)
+
+    val result = transformedDS.collect
+
+    for(entry <- result) {
+      expectedMap.contains(entry.label) should be(true)
+      entry.vector should equal(expectedMap(entry.label))
+    }
+  }
+
+  it should "return an empty vector if the max degree is zero" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val input = Seq(
+      LabeledVector(DenseVector(2, 3), 1.0),
+      LabeledVector(DenseVector(2, 3, 4), 2.0)
+    )
+
+    val inputDS = env.fromCollection(input)
+
+    val transformer = PolynomialBase()
+      .setDegree(0)
+
+    val transformedDS = transformer.transform(inputDS)
+
+    val result = transformedDS.collect
+
+    val expectedMap = List(
+      (1.0 -> DenseVector()),
+      (2.0 -> DenseVector())
+    ) toMap
+
+    for(entry <- result) {
+      expectedMap.contains(entry.label) should be(true)
+      entry.vector should equal(expectedMap(entry.label))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
deleted file mode 100644
index d783ecb..0000000
--- 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.recommendation
-
-import org.apache.flink.api.common.ExecutionMode
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.client.CliFrontendTestUtils
-import org.junit.{BeforeClass, Test}
-import org.scalatest.ShouldMatchers
-
-import org.apache.flink.api.scala._
-
-class ALSITCase extends ShouldMatchers {
-
-  @Test
-  def testMatrixFactorization(): Unit = {
-    import ALSData._
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    env.setParallelism(2)
-
-    val als = ALS()
-      .setIterations(iterations)
-      .setLambda(lambda)
-      .setBlocks(4)
-      .setNumFactors(numFactors)
-
-    val inputDS = env.fromCollection(data)
-
-    val model = als.fit(inputDS)
-
-    val testData = env.fromCollection(expectedResult.map{
-      case (userID, itemID, rating) => (userID, itemID)
-    })
-
-    val predictions = model.transform(testData).collect
-
-    predictions.length should equal(expectedResult.length)
-
-    val resultMap = expectedResult map {
-      case (uID, iID, value) => (uID, iID) -> value
-    } toMap
-
-    predictions foreach {
-      case (uID, iID, value) => {
-        resultMap.isDefinedAt(((uID, iID))) should be(true)
-
-        value should be(resultMap((uID, iID)) +- 0.1)
-      }
-    }
-
-    val risk = model.empiricalRisk(inputDS).collect(0)
-
-    risk should be(expectedEmpiricalRisk +- 1)
-  }
-}
-
-object ALSITCase {
-
-  @BeforeClass
-  def setup(): Unit = {
-    CliFrontendTestUtils.pipeSystemOutToNull()
-  }
-}
-
-object ALSData {
-
-  val iterations = 9
-  val lambda = 1.0
-  val numFactors = 5
-
-  val data: Seq[(Int, Int, Double)] = {
-    Seq(
-      (2,13,534.3937734561154),
-      (6,14,509.63176469621936),
-      (4,14,515.8246770897443),
-      (7,3,495.05234565105),
-      (2,3,532.3281786219485),
-      (5,3,497.1906356844367),
-      (3,3,512.0640508585093),
-      (10,3,500.2906742233019),
-      (1,4,521.9189079662882),
-      (2,4,515.0734651491396),
-      (1,7,522.7532725967008),
-      (8,4,492.65683825096403),
-      (4,8,492.65683825096403),
-      (10,8,507.03319667905413),
-      (7,1,522.7532725967008),
-      (1,1,572.2230209271174),
-      (2,1,563.5849190220224),
-      (6,1,518.4844061038742),
-      (9,1,529.2443732217674),
-      (8,1,543.3202505434103),
-      (7,2,516.0188923307859),
-      (1,2,563.5849190220224),
-      (1,11,515.1023793011227),
-      (8,2,536.8571133978352),
-      (2,11,507.90776961762225),
-      (3,2,532.3281786219485),
-      (5,11,476.24185144363304),
-      (4,2,515.0734651491396),
-      (4,11,469.92049343738233),
-      (3,12,509.4713776280098),
-      (4,12,494.6533165132021),
-      (7,5,482.2907867916308),
-      (6,5,477.5940040923741),
-      (4,5,480.9040684364228),
-      (1,6,518.4844061038742),
-      (6,6,470.6605085832807),
-      (8,6,489.6360564705307),
-      (4,6,472.74052954447046),
-      (7,9,482.5837650471611),
-      (5,9,487.00175463269863),
-      (9,9,500.69514584780944),
-      (4,9,477.71644808419325),
-      (7,10,485.3852917539852),
-      (8,10,507.03319667905413),
-      (3,10,500.2906742233019),
-      (5,15,488.08215944254437),
-      (6,15,480.16929757607346)
-    )
-  }
-
-  val expectedResult: Seq[(Int, Int, Double)] = {
-    Seq(
-      (2, 2, 526.1037),
-      (5, 9, 468.5680),
-      (10, 3, 484.8975),
-      (5, 13, 451.6228),
-      (1, 15, 493.4956),
-      (4, 11, 456.3862)
-    )
-  }
-
-  val expectedEmpiricalRisk = 505374.1877
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
new file mode 100644
index 0000000..aadcd2d
--- /dev/null
+++ 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import scala.language.postfixOps
+
+import org.scalatest._
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+class ALSITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The alternating least squares (ALS) implementation"
+
+  it should "properly factorize a matrix" in {
+    import ALSData._
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val als = ALS()
+      .setIterations(iterations)
+      .setLambda(lambda)
+      .setBlocks(4)
+      .setNumFactors(numFactors)
+
+    val inputDS = env.fromCollection(data)
+
+    val model = als.fit(inputDS)
+
+    val testData = env.fromCollection(expectedResult.map{
+      case (userID, itemID, rating) => (userID, itemID)
+    })
+
+    val predictions = model.transform(testData).collect
+
+    predictions.length should equal(expectedResult.length)
+
+    val resultMap = expectedResult map {
+      case (uID, iID, value) => (uID, iID) -> value
+    } toMap
+
+    predictions foreach {
+      case (uID, iID, value) => {
+        resultMap.isDefinedAt(((uID, iID))) should be(true)
+
+        value should be(resultMap((uID, iID)) +- 0.1)
+      }
+    }
+
+    val risk = model.empiricalRisk(inputDS).collect(0)
+
+    risk should be(expectedEmpiricalRisk +- 1)
+  }
+}
+
+object ALSData {
+
+  val iterations = 9
+  val lambda = 1.0
+  val numFactors = 5
+
+  val data: Seq[(Int, Int, Double)] = {
+    Seq(
+      (2,13,534.3937734561154),
+      (6,14,509.63176469621936),
+      (4,14,515.8246770897443),
+      (7,3,495.05234565105),
+      (2,3,532.3281786219485),
+      (5,3,497.1906356844367),
+      (3,3,512.0640508585093),
+      (10,3,500.2906742233019),
+      (1,4,521.9189079662882),
+      (2,4,515.0734651491396),
+      (1,7,522.7532725967008),
+      (8,4,492.65683825096403),
+      (4,8,492.65683825096403),
+      (10,8,507.03319667905413),
+      (7,1,522.7532725967008),
+      (1,1,572.2230209271174),
+      (2,1,563.5849190220224),
+      (6,1,518.4844061038742),
+      (9,1,529.2443732217674),
+      (8,1,543.3202505434103),
+      (7,2,516.0188923307859),
+      (1,2,563.5849190220224),
+      (1,11,515.1023793011227),
+      (8,2,536.8571133978352),
+      (2,11,507.90776961762225),
+      (3,2,532.3281786219485),
+      (5,11,476.24185144363304),
+      (4,2,515.0734651491396),
+      (4,11,469.92049343738233),
+      (3,12,509.4713776280098),
+      (4,12,494.6533165132021),
+      (7,5,482.2907867916308),
+      (6,5,477.5940040923741),
+      (4,5,480.9040684364228),
+      (1,6,518.4844061038742),
+      (6,6,470.6605085832807),
+      (8,6,489.6360564705307),
+      (4,6,472.74052954447046),
+      (7,9,482.5837650471611),
+      (5,9,487.00175463269863),
+      (9,9,500.69514584780944),
+      (4,9,477.71644808419325),
+      (7,10,485.3852917539852),
+      (8,10,507.03319667905413),
+      (3,10,500.2906742233019),
+      (5,15,488.08215944254437),
+      (6,15,480.16929757607346)
+    )
+  }
+
+  val expectedResult: Seq[(Int, Int, Double)] = {
+    Seq(
+      (2, 2, 526.1037),
+      (5, 9, 468.5680),
+      (10, 3, 484.8975),
+      (5, 13, 451.6228),
+      (1, 15, 493.4956),
+      (4, 11, 456.3862)
+    )
+  }
+
+  val expectedEmpiricalRisk = 505374.1877
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
deleted file mode 100644
index 15292b7..0000000
--- 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.regression
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.client.CliFrontendTestUtils
-import org.apache.flink.ml.common.ParameterMap
-import org.apache.flink.ml.feature.PolynomialBase
-import org.junit.{BeforeClass, Test}
-import org.scalatest.ShouldMatchers
-
-import org.apache.flink.api.scala._
-
-class MultipleLinearRegressionITCase extends ShouldMatchers {
-
-  @Test
-  def testEstimationOfLinearFunction(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    env.setParallelism(2)
-
-    val learner = MultipleLinearRegression()
-
-    import RegressionData._
-
-    val parameters = ParameterMap()
-
-    parameters.add(MultipleLinearRegression.Stepsize, 1.0)
-    parameters.add(MultipleLinearRegression.Iterations, 10)
-    parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
-
-    val inputDS = env.fromCollection(data)
-    val model = learner.fit(inputDS, parameters)
-
-    val weightList = model.weights.collect
-
-    weightList.size should equal(1)
-
-    val (weights, weight0) = weightList(0)
-
-    expectedWeights zip weights foreach {
-      case (expectedWeight, weight) =>
-        weight should be (expectedWeight +- 1)
-    }
-    weight0 should be (expectedWeight0 +- 0.4)
-
-    val srs = model.squaredResidualSum(inputDS).collect(0)
-
-    srs should be (expectedSquaredResidualSum +- 2)
-  }
-
-  @Test
-  def testEstimationOfCubicFunction(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    env.setParallelism(2)
-
-    val polynomialBase = PolynomialBase()
-    val learner = MultipleLinearRegression()
-
-    val pipeline = polynomialBase.chain(learner)
-
-    val inputDS = env.fromCollection(RegressionData.polynomialData)
-
-    val parameters = ParameterMap()
-      .add(PolynomialBase.Degree, 3)
-      .add(MultipleLinearRegression.Stepsize, 0.002)
-      .add(MultipleLinearRegression.Iterations, 100)
-
-    val model = pipeline.fit(inputDS, parameters)
-
-    val weightList = model.weights.collect
-
-    weightList.size should equal(1)
-
-    val (weights, weight0) = weightList(0)
-
-    RegressionData.expectedPolynomialWeights.zip(weights) foreach {
-      case (expectedWeight, weight) =>
-        weight should be(expectedWeight +- 0.1)
-    }
-
-    weight0 should be(RegressionData.expectedPolynomialWeight0 +- 0.1)
-
-    val transformedInput = polynomialBase.transform(inputDS, parameters)
-
-    val srs = model.squaredResidualSum(transformedInput).collect(0)
-
-    srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5)
-  }
-}
-
-object MultipleLinearRegressionITCase{
-
-  @BeforeClass
-  def setup(): Unit = {
-    CliFrontendTestUtils.pipeSystemOutToNull()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
new file mode 100644
index 0000000..9389ae7
--- /dev/null
+++ 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.regression
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.ml.common.ParameterMap
+import org.apache.flink.ml.feature.PolynomialBase
+import org.scalatest.{Matchers, FlatSpec}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+class MultipleLinearRegressionITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "The multipe linear regression implementation"
+
+  it should "estimate a linear function" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val learner = MultipleLinearRegression()
+
+    import RegressionData._
+
+    val parameters = ParameterMap()
+
+    parameters.add(MultipleLinearRegression.Stepsize, 1.0)
+    parameters.add(MultipleLinearRegression.Iterations, 10)
+    parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+    val inputDS = env.fromCollection(data)
+    val model = learner.fit(inputDS, parameters)
+
+    val weightList = model.weights.collect
+
+    weightList.size should equal(1)
+
+    val (weights, weight0) = weightList(0)
+
+    expectedWeights zip weights foreach {
+      case (expectedWeight, weight) =>
+        weight should be (expectedWeight +- 1)
+    }
+    weight0 should be (expectedWeight0 +- 0.4)
+
+    val srs = model.squaredResidualSum(inputDS).collect(0)
+
+    srs should be (expectedSquaredResidualSum +- 2)
+  }
+
+  it should "estimate a cubic function" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val polynomialBase = PolynomialBase()
+    val learner = MultipleLinearRegression()
+
+    val pipeline = polynomialBase.chain(learner)
+
+    val inputDS = env.fromCollection(RegressionData.polynomialData)
+
+    val parameters = ParameterMap()
+      .add(PolynomialBase.Degree, 3)
+      .add(MultipleLinearRegression.Stepsize, 0.002)
+      .add(MultipleLinearRegression.Iterations, 100)
+
+    val model = pipeline.fit(inputDS, parameters)
+
+    val weightList = model.weights.collect
+
+    weightList.size should equal(1)
+
+    val (weights, weight0) = weightList(0)
+
+    RegressionData.expectedPolynomialWeights.zip(weights) foreach {
+      case (expectedWeight, weight) =>
+        weight should be(expectedWeight +- 0.1)
+    }
+
+    weight0 should be(RegressionData.expectedPolynomialWeight0 +- 0.1)
+
+    val transformedInput = polynomialBase.transform(inputDS, parameters)
+
+    val srs = model.squaredResidualSum(transformedInput).collect(0)
+
+    srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml
index 467fb44..bda1b38 100644
--- a/flink-test-utils/pom.xml
+++ b/flink-test-utils/pom.xml
@@ -77,6 +77,12 @@ under the License.
                        <artifactId>guava</artifactId>
                        <version>${guava.version}</version>
                </dependency>
+
+               <dependency>
+                       <groupId>org.scalatest</groupId>
+                       
<artifactId>scalatest_${scala.binary.version}</artifactId>
+                       <scope>compile</scope>
+               </dependency>
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
new file mode 100644
index 0000000..2e664c1
--- /dev/null
+++ 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util
+
+import org.scalatest.{Suite, BeforeAndAfter}
+
+/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala 
based tests.
+  * Additionally a TestEnvironment with the started cluster is created and set 
as the default
+  * [[org.apache.flink.api.java.ExecutionEnvironment]].
+  *
+  * This mixin starts a ForkableFlinkMiniCluster with one TaskManager and a 
number of slots given
+  * by parallelism. This value can be overridden in a sub class in order to 
start the cluster
+  * with a different number of slots.
+  *
+  * The cluster is started once before starting the tests and is re-used for 
the individual tests.
+  * After all tests have been executed, the cluster is shutdown.
+  *
+  * The cluster is used by obtaining the default 
[[org.apache.flink.api.java.ExecutionEnvironment]].
+  *
+  * @example
+  *          {{{
+  *            def testSomething: Unit = {
+  *             // Obtain TestEnvironment with started ForkableFlinkMiniCluster
+  *             val env = ExecutionEnvironment.getExecutionEnvironment
+  *
+  *             env.fromCollection(...)
+  *
+  *             env.execute
+  *            }
+  *          }}}
+  *
+  */
+trait FlinkTestBase
+  extends BeforeAndAfter {
+  that: Suite =>
+
+  var cluster: Option[ForkableFlinkMiniCluster] = None
+  val parallelism = 4
+
+  before {
+    val cl = TestBaseUtils.startCluster(1, parallelism, false)
+    val clusterEnvironment = new TestEnvironment(cl, parallelism)
+    clusterEnvironment.setAsContext
+
+    cluster = Some(cl)
+  }
+
+  after {
+    cluster.map(c => TestBaseUtils.stopCluster(c, 
TestBaseUtils.DEFAULT_TIMEOUT))
+  }
+
+}

Reply via email to