MAHOUT-1570: upgraded to flink 0.9-SNAPSHOT for IO
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/1806ca80 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/1806ca80 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/1806ca80 Branch: refs/heads/flink-binding Commit: 1806ca809beb2312c62e83b3dbab314b90172118 Parents: b3bda19 Author: Alexey Grigorev <[email protected]> Authored: Tue Jun 2 14:25:16 2015 +0200 Committer: Alexey Grigorev <[email protected]> Committed: Fri Sep 25 17:41:48 2015 +0200 ---------------------------------------------------------------------- flink/pom.xml | 11 ++++--- flink/src/main/resources/log4j.properties | 8 +++++ .../mahout/flinkbindings/RLikeOpsSuite.scala | 34 ++++++++++---------- .../mahout/flinkbindings/blas/LATestSuit.scala | 12 +++---- pom.xml | 2 +- 5 files changed, 38 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/1806ca80/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml index 80ff05d..68ce42d 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -85,6 +85,7 @@ <skipTests>true</skipTests> </configuration> </plugin> + <!-- enable scalatest --> <plugin> <groupId>org.scalatest</groupId> @@ -119,13 +120,10 @@ </execution> </executions> </plugin> - - </plugins> </build> <dependencies> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> @@ -176,11 +174,14 @@ <artifactId>guava</artifactId> </dependency> - <!-- scala stuff --> + <!-- tests --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.compat.version}</artifactId> </dependency> - </dependencies> </project> http://git-wip-us.apache.org/repos/asf/mahout/blob/1806ca80/flink/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink/src/main/resources/log4j.properties b/flink/src/main/resources/log4j.properties new file mode 100644 index 0000000..073a00d --- /dev/null +++ b/flink/src/main/resources/log4j.properties @@ -0,0 +1,8 @@ +# Root logger option +log4j.rootLogger=info, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.conversionPattern=%d{ABSOLUTE} %5p %t %c{1}:%M:%L - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/1806ca80/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala index a0da308..6f8eccf 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala @@ -22,7 +22,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { val LOGGER = LoggerFactory.getLogger(getClass()) - ignore("A %*% x") { + test("A %*% x") { val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) val A = drmParallelize(m = inCoreA, numPartitions = 2) val x: Vector = (0, 1, 2) @@ -33,7 +33,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert(b == dvec(8, 11, 14)) } - ignore("A.t") { + test("A.t") { val inCoreA = dense((1, 2, 3), (2, 3, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2) val res = A.t.collect @@ -42,7 +42,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res - expected).norm < 1e-6) } - ignore("A.t %*% x") { + test("A.t %*% x") { val inCoreA = dense((1, 2, 3), (2, 3, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2) val x = dvec(3, 11) @@ -52,7 +52,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res - expected).norm(2) < 1e-6) } - ignore("A.t %*% B") { + test("A.t %*% B") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val inCoreB = dense((1, 2), (3, 4), (11, 4)) @@ -65,7 +65,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - expected).norm < 1e-6) } - ignore("A %*% B.t") { + test("A %*% B.t") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val inCoreB = dense((1, 2), (3, 4), (11, 4)) @@ -78,7 +78,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - expected).norm < 1e-6) } - ignore("A.t %*% A") { + test("A.t %*% A") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2) @@ -88,7 +88,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - expected).norm < 1e-6) } - ignore("A %*% B") { + test("A %*% B") { val inCoreA = dense((1, 2), (2, 3), (3, 4)).t val inCoreB = dense((1, 2), (3, 4), (11, 4)) @@ -101,7 +101,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - expected).norm < 1e-6) } - ignore("A * scalar") { + test("A * scalar") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2) @@ -109,7 +109,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - inCoreA * 5).norm < 1e-6) } - ignore("A / scalar") { + test("A / scalar") { val inCoreA = dense((1, 2), (2, 3), (3, 4)).t val A = drmParallelize(m = inCoreA, numPartitions = 2) @@ -117,7 +117,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - (inCoreA / 5)).norm < 1e-6) } - ignore("A + scalar") { + test("A + scalar") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2) @@ -125,7 +125,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - (inCoreA + 5)).norm < 1e-6) } - ignore("A - scalar") { + test("A - scalar") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2) @@ -133,7 +133,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - (inCoreA - 5)).norm < 1e-6) } - ignore("A * B") { + test("A * B") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val inCoreB = dense((1, 2), (3, 4), (11, 4)) @@ -145,7 +145,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - expected).norm < 1e-6) } - ignore("A / B") { + test("A / B") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val inCoreB = dense((1, 2), (3, 4), (11, 4)) @@ -157,7 +157,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - expected).norm < 1e-6) } - ignore("A + B") { + test("A + B") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val inCoreB = dense((1, 2), (3, 4), (11, 4)) @@ -169,7 +169,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - expected).norm < 1e-6) } - ignore("A - B") { + test("A - B") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val inCoreB = dense((1, 2), (3, 4), (11, 4)) @@ -181,7 +181,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - expected).norm < 1e-6) } - ignore("A cbind B") { + test("A cbind B") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val inCoreB = dense((1, 2), (3, 4), (11, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2) @@ -192,7 +192,7 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuit { assert((res.collect - expected).norm < 1e-6) } - ignore("A rbind B") { + test("A rbind B") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val inCoreB = dense((1, 2), (3, 4), (11, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2) http://git-wip-us.apache.org/repos/asf/mahout/blob/1806ca80/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuit.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuit.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuit.scala index 6706599..b3d490d 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuit.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuit.scala @@ -16,7 +16,7 @@ import org.apache.mahout.math.drm.logical._ @RunWith(classOf[JUnitRunner]) class LATestSuit extends FunSuite with DistributedFlinkSuit { - ignore("Ax blockified") { + test("Ax blockified") { val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) val A = drmParallelize(m = inCoreA, numPartitions = 2) val x: Vector = (0, 1, 2) @@ -30,7 +30,7 @@ class LATestSuit extends FunSuite with DistributedFlinkSuit { assert(b == dvec(8, 11, 14)) } - ignore("At sparseTrick") { + test("At sparseTrick") { val inCoreA = dense((1, 2, 3), (2, 3, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2) @@ -42,7 +42,7 @@ class LATestSuit extends FunSuite with DistributedFlinkSuit { assert((output - inCoreA.t).norm < 1e-6) } - ignore("AtB notZippable") { + test("AtB notZippable") { val inCoreAt = dense((1, 2), (2, 3), (3, 4)) val At = drmParallelize(m = inCoreAt, numPartitions = 2) @@ -60,7 +60,7 @@ class LATestSuit extends FunSuite with DistributedFlinkSuit { assert((output - expected).norm < 1e-6) } - ignore("AewScalar opScalarNoSideEffect") { + test("AewScalar opScalarNoSideEffect") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2) val scalar = 5.0 @@ -75,7 +75,7 @@ class LATestSuit extends FunSuite with DistributedFlinkSuit { assert((output - expected).norm < 1e-6) } - ignore("AewB rowWiseJoinNoSideEffect") { + test("AewB rowWiseJoinNoSideEffect") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2) @@ -88,7 +88,7 @@ class LATestSuit extends FunSuite with DistributedFlinkSuit { assert((output - (inCoreA * inCoreA)).norm < 1e-6) } - ignore("Cbind") { + test("Cbind") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val inCoreB = dense((4, 4), (5, 5), (6, 7)) val A = drmParallelize(m = inCoreA, numPartitions = 2) http://git-wip-us.apache.org/repos/asf/mahout/blob/1806ca80/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 35495fb..67d2eee 100644 --- a/pom.xml +++ b/pom.xml @@ -121,7 +121,7 @@ <scala.compat.version>2.10</scala.compat.version> <scala.version>2.10.4</scala.version> <spark.version>1.3.1</spark.version> - <flink.version>0.8.1</flink.version> + <flink.version>0.9.0-milestone-1</flink.version> <h2o.version>0.1.25</h2o.version> </properties> <issueManagement>
