Repository: flink Updated Branches: refs/heads/master c0b297573 -> b8aa49ce7
[FLINK-1794] [test-utils] Adds test base for scala tests and adapts existing flink-ml tests [FLINK-1794] [test-utils] Adds scala docs to FlinkTestBase This closes #540. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8aa49ce Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8aa49ce Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8aa49ce Branch: refs/heads/master Commit: b8aa49ce7d806c52183ab79c17c6024b60d0eaaf Parents: c0b2975 Author: Till Rohrmann <trohrm...@apache.org> Authored: Fri Mar 27 19:40:42 2015 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Mar 31 13:40:53 2015 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/program/Client.java | 2 +- flink-staging/flink-ml/pom.xml | 25 ++- .../flink/ml/feature/PolynomialBaseITCase.scala | 132 ---------------- .../ml/feature/PolynomialBaseITSuite.scala | 126 +++++++++++++++ .../flink/ml/recommendation/ALSITCase.scala | 153 ------------------- .../flink/ml/recommendation/ALSITSuite.scala | 149 ++++++++++++++++++ .../MultipleLinearRegressionITCase.scala | 115 -------------- .../MultipleLinearRegressionITSuite.scala | 109 +++++++++++++ flink-test-utils/pom.xml | 6 + .../apache/flink/test/util/FlinkTestBase.scala | 68 +++++++++ 10 files changed, 479 insertions(+), 406 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index 6dff9e2..dc1a7ab 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -70,7 +70,7 @@ public class Client { private final Optimizer compiler; // the compiler to compile the jobs - private boolean printStatusDuringExecution = false; + private boolean printStatusDuringExecution = true; /** * If != -1, this field specifies the total number of available slots on the cluster http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/pom.xml b/flink-staging/flink-ml/pom.xml index 123770f..4f251e5 100644 --- a/flink-staging/flink-ml/pom.xml +++ b/flink-staging/flink-ml/pom.xml @@ -41,11 +41,6 @@ </dependency> <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.binary.version}</artifactId> - </dependency> - - <dependency> <groupId>com.github.fommil.netlib</groupId> <artifactId>core</artifactId> <version>1.1.2</version> @@ -65,6 +60,13 @@ <type>test-jar</type> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -105,6 +107,19 @@ <goals> <goal>test</goal> </goals> + <configuration> + <suffixes>(?<!(IT|Integration))(Test|Suite|Case)</suffixes> + </configuration> + </execution> + <execution> + <id>integration-test</id> + <phase>integration-test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <suffixes>(IT|Integration)(Test|Suite|Case)</suffixes> + </configuration> </execution> </executions> </plugin> http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala deleted file mode 100644 index 95e2a25..0000000 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.feature - -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.client.CliFrontendTestUtils -import org.apache.flink.ml.common.LabeledVector -import org.apache.flink.ml.math.DenseVector -import org.junit.{BeforeClass, Test} -import org.scalatest.ShouldMatchers - -import org.apache.flink.api.scala._ - -class PolynomialBaseITCase extends ShouldMatchers { - - @Test - def testMapElementToPolynomialVectorSpace (): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - - env.setParallelism (2) - - val input = Seq ( - LabeledVector (DenseVector (1), 1.0), - LabeledVector (DenseVector (2), 2.0) - ) - - val inputDS = env.fromCollection (input) - - val transformer = PolynomialBase () - .setDegree (3) - - val transformedDS = transformer.transform (inputDS) - - val expectedMap = List ( - (1.0 -> DenseVector (1.0, 1.0, 1.0) ), - (2.0 -> DenseVector (8.0, 4.0, 2.0) ) - ) toMap - - val result = transformedDS.collect - - for (entry <- result) { - expectedMap.contains (entry.label) should be (true) - entry.vector should equal (expectedMap (entry.label) ) - } - } - - @Test - def testMapVectorToPolynomialVectorSpace(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - - env.setParallelism(2) - - val input = Seq( - LabeledVector(DenseVector(2, 3), 1.0), - LabeledVector(DenseVector(2, 3, 4), 2.0) - ) - - val expectedMap = List( - (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)), - (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 64.0, 4.0, 6.0, 8.0, - 9.0, 12.0, 16.0, 2.0, 3.0, 4.0)) - ) toMap - - val inputDS = env.fromCollection(input) - - val transformer = PolynomialBase() - .setDegree(3) - - val transformedDS = transformer.transform(inputDS) - - val result = transformedDS.collect - - for(entry <- result) { - expectedMap.contains(entry.label) should be(true) - entry.vector should equal(expectedMap(entry.label)) - } - } - - @Test - def testReturnEmptyVectorIfDegreeIsZero(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - - env.setParallelism(2) - - val input = Seq( - LabeledVector(DenseVector(2, 3), 1.0), - LabeledVector(DenseVector(2, 3, 4), 2.0) - ) - - val inputDS = env.fromCollection(input) - - val transformer = PolynomialBase() - .setDegree(0) - - val transformedDS = transformer.transform(inputDS) - - val result = transformedDS.collect - - val expectedMap = List( - (1.0 -> DenseVector()), - (2.0 -> DenseVector()) - ) toMap - - for(entry <- result) { - expectedMap.contains(entry.label) should be(true) - entry.vector should equal(expectedMap(entry.label)) - } - } -} - -object PolynomialBaseITCase { - @BeforeClass - def setup(): Unit = { - CliFrontendTestUtils.pipeSystemOutToNull() - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala new file mode 100644 index 0000000..5529e17 --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.feature + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.DenseVector +import org.scalatest.{Matchers, FlatSpec} + +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase + +class PolynomialBaseITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "The polynomial base implementation" + + it should "map single element vectors to the polynomial vector space" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism (2) + + val input = Seq ( + LabeledVector (DenseVector (1), 1.0), + LabeledVector (DenseVector (2), 2.0) + ) + + val inputDS = env.fromCollection (input) + + val transformer = PolynomialBase () + .setDegree (3) + + val transformedDS = transformer.transform (inputDS) + + val expectedMap = List ( + (1.0 -> DenseVector (1.0, 1.0, 1.0) ), + (2.0 -> DenseVector (8.0, 4.0, 2.0) ) + ) toMap + + val result = transformedDS.collect + + for (entry <- result) { + expectedMap.contains (entry.label) should be (true) + entry.vector should equal (expectedMap (entry.label) ) + } + } + + it should "map vectors to the polynomial vector space" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val input = Seq( + LabeledVector(DenseVector(2, 3), 1.0), + LabeledVector(DenseVector(2, 3, 4), 2.0) + ) + + val expectedMap = List( + (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)), + (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 64.0, 4.0, 6.0, 8.0, + 9.0, 12.0, 16.0, 2.0, 3.0, 4.0)) + ) toMap + + val inputDS = env.fromCollection(input) + + val transformer = PolynomialBase() + .setDegree(3) + + val transformedDS = transformer.transform(inputDS) + + val result = transformedDS.collect + + for(entry <- result) { + expectedMap.contains(entry.label) should be(true) + entry.vector should equal(expectedMap(entry.label)) + } + } + + it should "return an empty vector if the max degree is zero" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val input = Seq( + LabeledVector(DenseVector(2, 3), 1.0), + LabeledVector(DenseVector(2, 3, 4), 2.0) + ) + + val inputDS = env.fromCollection(input) + + val transformer = PolynomialBase() + .setDegree(0) + + val transformedDS = transformer.transform(inputDS) + + val result = transformedDS.collect + + val expectedMap = List( + (1.0 -> DenseVector()), + (2.0 -> DenseVector()) + ) toMap + + for(entry <- result) { + expectedMap.contains(entry.label) should be(true) + entry.vector should equal(expectedMap(entry.label)) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala deleted file mode 100644 index d783ecb..0000000 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.recommendation - -import org.apache.flink.api.common.ExecutionMode -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.client.CliFrontendTestUtils -import org.junit.{BeforeClass, Test} -import org.scalatest.ShouldMatchers - -import org.apache.flink.api.scala._ - -class ALSITCase extends ShouldMatchers { - - @Test - def testMatrixFactorization(): Unit = { - import ALSData._ - - val env = ExecutionEnvironment.getExecutionEnvironment - - env.setParallelism(2) - - val als = ALS() - .setIterations(iterations) - .setLambda(lambda) - .setBlocks(4) - .setNumFactors(numFactors) - - val inputDS = env.fromCollection(data) - - val model = als.fit(inputDS) - - val testData = env.fromCollection(expectedResult.map{ - case (userID, itemID, rating) => (userID, itemID) - }) - - val predictions = model.transform(testData).collect - - predictions.length should equal(expectedResult.length) - - val resultMap = expectedResult map { - case (uID, iID, value) => (uID, iID) -> value - } toMap - - predictions foreach { - case (uID, iID, value) => { - resultMap.isDefinedAt(((uID, iID))) should be(true) - - value should be(resultMap((uID, iID)) +- 0.1) - } - } - - val risk = model.empiricalRisk(inputDS).collect(0) - - risk should be(expectedEmpiricalRisk +- 1) - } -} - -object ALSITCase { - - @BeforeClass - def setup(): Unit = { - CliFrontendTestUtils.pipeSystemOutToNull() - } -} - -object ALSData { - - val iterations = 9 - val lambda = 1.0 - val numFactors = 5 - - val data: Seq[(Int, Int, Double)] = { - Seq( - (2,13,534.3937734561154), - (6,14,509.63176469621936), - (4,14,515.8246770897443), - (7,3,495.05234565105), - (2,3,532.3281786219485), - (5,3,497.1906356844367), - (3,3,512.0640508585093), - (10,3,500.2906742233019), - (1,4,521.9189079662882), - (2,4,515.0734651491396), - (1,7,522.7532725967008), - (8,4,492.65683825096403), - (4,8,492.65683825096403), - (10,8,507.03319667905413), - (7,1,522.7532725967008), - (1,1,572.2230209271174), - (2,1,563.5849190220224), - (6,1,518.4844061038742), - (9,1,529.2443732217674), - (8,1,543.3202505434103), - (7,2,516.0188923307859), - (1,2,563.5849190220224), - (1,11,515.1023793011227), - (8,2,536.8571133978352), - (2,11,507.90776961762225), - (3,2,532.3281786219485), - (5,11,476.24185144363304), - (4,2,515.0734651491396), - (4,11,469.92049343738233), - (3,12,509.4713776280098), - (4,12,494.6533165132021), - (7,5,482.2907867916308), - (6,5,477.5940040923741), - (4,5,480.9040684364228), - (1,6,518.4844061038742), - (6,6,470.6605085832807), - (8,6,489.6360564705307), - (4,6,472.74052954447046), - (7,9,482.5837650471611), - (5,9,487.00175463269863), - (9,9,500.69514584780944), - (4,9,477.71644808419325), - (7,10,485.3852917539852), - (8,10,507.03319667905413), - (3,10,500.2906742233019), - (5,15,488.08215944254437), - (6,15,480.16929757607346) - ) - } - - val expectedResult: Seq[(Int, Int, Double)] = { - Seq( - (2, 2, 526.1037), - (5, 9, 468.5680), - (10, 3, 484.8975), - (5, 13, 451.6228), - (1, 15, 493.4956), - (4, 11, 456.3862) - ) - } - - val expectedEmpiricalRisk = 505374.1877 -} http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala new file mode 100644 index 0000000..aadcd2d --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation + +import scala.language.postfixOps + +import org.scalatest._ + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase + +class ALSITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + override val parallelism = 2 + + behavior of "The alternating least squares (ALS) implementation" + + it should "properly factorize a matrix" in { + import ALSData._ + + val env = ExecutionEnvironment.getExecutionEnvironment + + val als = ALS() + .setIterations(iterations) + .setLambda(lambda) + .setBlocks(4) + .setNumFactors(numFactors) + + val inputDS = env.fromCollection(data) + + val model = als.fit(inputDS) + + val testData = env.fromCollection(expectedResult.map{ + case (userID, itemID, rating) => (userID, itemID) + }) + + val predictions = model.transform(testData).collect + + predictions.length should equal(expectedResult.length) + + val resultMap = expectedResult map { + case (uID, iID, value) => (uID, iID) -> value + } toMap + + predictions foreach { + case (uID, iID, value) => { + resultMap.isDefinedAt(((uID, iID))) should be(true) + + value should be(resultMap((uID, iID)) +- 0.1) + } + } + + val risk = model.empiricalRisk(inputDS).collect(0) + + risk should be(expectedEmpiricalRisk +- 1) + } +} + +object ALSData { + + val iterations = 9 + val lambda = 1.0 + val numFactors = 5 + + val data: Seq[(Int, Int, Double)] = { + Seq( + (2,13,534.3937734561154), + (6,14,509.63176469621936), + (4,14,515.8246770897443), + (7,3,495.05234565105), + (2,3,532.3281786219485), + (5,3,497.1906356844367), + (3,3,512.0640508585093), + (10,3,500.2906742233019), + (1,4,521.9189079662882), + (2,4,515.0734651491396), + (1,7,522.7532725967008), + (8,4,492.65683825096403), + (4,8,492.65683825096403), + (10,8,507.03319667905413), + (7,1,522.7532725967008), + (1,1,572.2230209271174), + (2,1,563.5849190220224), + (6,1,518.4844061038742), + (9,1,529.2443732217674), + (8,1,543.3202505434103), + (7,2,516.0188923307859), + (1,2,563.5849190220224), + (1,11,515.1023793011227), + (8,2,536.8571133978352), + (2,11,507.90776961762225), + (3,2,532.3281786219485), + (5,11,476.24185144363304), + (4,2,515.0734651491396), + (4,11,469.92049343738233), + (3,12,509.4713776280098), + (4,12,494.6533165132021), + (7,5,482.2907867916308), + (6,5,477.5940040923741), + (4,5,480.9040684364228), + (1,6,518.4844061038742), + (6,6,470.6605085832807), + (8,6,489.6360564705307), + (4,6,472.74052954447046), + (7,9,482.5837650471611), + (5,9,487.00175463269863), + (9,9,500.69514584780944), + (4,9,477.71644808419325), + (7,10,485.3852917539852), + (8,10,507.03319667905413), + (3,10,500.2906742233019), + (5,15,488.08215944254437), + (6,15,480.16929757607346) + ) + } + + val expectedResult: Seq[(Int, Int, Double)] = { + Seq( + (2, 2, 526.1037), + (5, 9, 468.5680), + (10, 3, 484.8975), + (5, 13, 451.6228), + (1, 15, 493.4956), + (4, 11, 456.3862) + ) + } + + val expectedEmpiricalRisk = 505374.1877 +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala deleted file mode 100644 index 15292b7..0000000 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.regression - -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.client.CliFrontendTestUtils -import org.apache.flink.ml.common.ParameterMap -import org.apache.flink.ml.feature.PolynomialBase -import org.junit.{BeforeClass, Test} -import org.scalatest.ShouldMatchers - -import org.apache.flink.api.scala._ - -class MultipleLinearRegressionITCase extends ShouldMatchers { - - @Test - def testEstimationOfLinearFunction(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - - env.setParallelism(2) - - val learner = MultipleLinearRegression() - - import RegressionData._ - - val parameters = ParameterMap() - - parameters.add(MultipleLinearRegression.Stepsize, 1.0) - parameters.add(MultipleLinearRegression.Iterations, 10) - parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) - - val inputDS = env.fromCollection(data) - val model = learner.fit(inputDS, parameters) - - val weightList = model.weights.collect - - weightList.size should equal(1) - - val (weights, weight0) = weightList(0) - - expectedWeights zip weights foreach { - case (expectedWeight, weight) => - weight should be (expectedWeight +- 1) - } - weight0 should be (expectedWeight0 +- 0.4) - - val srs = model.squaredResidualSum(inputDS).collect(0) - - srs should be (expectedSquaredResidualSum +- 2) - } - - @Test - def testEstimationOfCubicFunction(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - - env.setParallelism(2) - - val polynomialBase = PolynomialBase() - val learner = MultipleLinearRegression() - - val pipeline = polynomialBase.chain(learner) - - val inputDS = env.fromCollection(RegressionData.polynomialData) - - val parameters = ParameterMap() - .add(PolynomialBase.Degree, 3) - .add(MultipleLinearRegression.Stepsize, 0.002) - .add(MultipleLinearRegression.Iterations, 100) - - val model = pipeline.fit(inputDS, parameters) - - val weightList = model.weights.collect - - weightList.size should equal(1) - - val (weights, weight0) = weightList(0) - - RegressionData.expectedPolynomialWeights.zip(weights) foreach { - case (expectedWeight, weight) => - weight should be(expectedWeight +- 0.1) - } - - weight0 should be(RegressionData.expectedPolynomialWeight0 +- 0.1) - - val transformedInput = polynomialBase.transform(inputDS, parameters) - - val srs = model.squaredResidualSum(transformedInput).collect(0) - - srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5) - } -} - -object MultipleLinearRegressionITCase{ - - @BeforeClass - def setup(): Unit = { - CliFrontendTestUtils.pipeSystemOutToNull() - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala new file mode 100644 index 0000000..9389ae7 --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.regression + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.ml.common.ParameterMap +import org.apache.flink.ml.feature.PolynomialBase +import org.scalatest.{Matchers, FlatSpec} + +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase + +class MultipleLinearRegressionITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "The multipe linear regression implementation" + + it should "estimate a linear function" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val learner = MultipleLinearRegression() + + import RegressionData._ + + val parameters = ParameterMap() + + parameters.add(MultipleLinearRegression.Stepsize, 1.0) + parameters.add(MultipleLinearRegression.Iterations, 10) + parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + + val inputDS = env.fromCollection(data) + val model = learner.fit(inputDS, parameters) + + val weightList = model.weights.collect + + weightList.size should equal(1) + + val (weights, weight0) = weightList(0) + + expectedWeights zip weights foreach { + case (expectedWeight, weight) => + weight should be (expectedWeight +- 1) + } + weight0 should be (expectedWeight0 +- 0.4) + + val srs = model.squaredResidualSum(inputDS).collect(0) + + srs should be (expectedSquaredResidualSum +- 2) + } + + it should "estimate a cubic function" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val polynomialBase = PolynomialBase() + val learner = MultipleLinearRegression() + + val pipeline = polynomialBase.chain(learner) + + val inputDS = env.fromCollection(RegressionData.polynomialData) + + val parameters = ParameterMap() + .add(PolynomialBase.Degree, 3) + .add(MultipleLinearRegression.Stepsize, 0.002) + .add(MultipleLinearRegression.Iterations, 100) + + val model = pipeline.fit(inputDS, parameters) + + val weightList = model.weights.collect + + weightList.size should equal(1) + + val (weights, weight0) = weightList(0) + + RegressionData.expectedPolynomialWeights.zip(weights) foreach { + case (expectedWeight, weight) => + weight should be(expectedWeight +- 0.1) + } + + weight0 should be(RegressionData.expectedPolynomialWeight0 +- 0.1) + + val transformedInput = polynomialBase.transform(inputDS, parameters) + + val srs = model.squaredResidualSum(transformedInput).collect(0) + + srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-test-utils/pom.xml ---------------------------------------------------------------------- diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml index 467fb44..bda1b38 100644 --- a/flink-test-utils/pom.xml +++ b/flink-test-utils/pom.xml @@ -77,6 +77,12 @@ under the License. <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> + + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>compile</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/b8aa49ce/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala new file mode 100644 index 0000000..2e664c1 --- /dev/null +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util + +import org.scalatest.{Suite, BeforeAndAfter} + +/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests. + * Additionally a TestEnvironment with the started cluster is created and set as the default + * [[org.apache.flink.api.java.ExecutionEnvironment]]. + * + * This mixin starts a ForkableFlinkMiniCluster with one TaskManager and a number of slots given + * by parallelism. This value can be overridden in a sub class in order to start the cluster + * with a different number of slots. + * + * The cluster is started once before starting the tests and is re-used for the individual tests. + * After all tests have been executed, the cluster is shutdown. + * + * The cluster is used by obtaining the default [[org.apache.flink.api.java.ExecutionEnvironment]]. + * + * @example + * {{{ + * def testSomething: Unit = { + * // Obtain TestEnvironment with started ForkableFlinkMiniCluster + * val env = ExecutionEnvironment.getExecutionEnvironment + * + * env.fromCollection(...) + * + * env.execute + * } + * }}} + * + */ +trait FlinkTestBase + extends BeforeAndAfter { + that: Suite => + + var cluster: Option[ForkableFlinkMiniCluster] = None + val parallelism = 4 + + before { + val cl = TestBaseUtils.startCluster(1, parallelism, false) + val clusterEnvironment = new TestEnvironment(cl, parallelism) + clusterEnvironment.setAsContext + + cluster = Some(cl) + } + + after { + cluster.map(c => TestBaseUtils.stopCluster(c, TestBaseUtils.DEFAULT_TIMEOUT)) + } + +}