Repository: flink Updated Branches: refs/heads/master 91102695b -> e45f13f53
[tests] SumMinMaxITCase to use collect() rather than flakey temp files Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cff478d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cff478d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cff478d Branch: refs/heads/master Commit: 1cff478d3712e3e9de59f6cf296b0669572eff0b Parents: 9110269 Author: Stephan Ewen <[email protected]> Authored: Wed Apr 8 15:39:01 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Apr 9 21:00:37 2015 +0200 ---------------------------------------------------------------------- .../api/scala/operators/SumMinMaxITCase.scala | 51 ++++++-------------- 1 file changed, 16 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1cff478d/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala index d57c6e2d7f..180486b 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala @@ -20,35 +20,18 @@ package org.apache.flink.api.scala.operators import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.{MultipleProgramsTestBase} -import org.junit.{Test, After, Before, Rule} -import org.junit.rules.TemporaryFolder +import org.apache.flink.test.util.MultipleProgramsTestBase + +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized +import org.junit.Assert._ import org.apache.flink.api.scala._ - @RunWith(classOf[Parameterized]) class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = null - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after(): Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } @Test def testFullAggregate(): Unit = { @@ -56,18 +39,18 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase( val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get3TupleDataSet(env) - val aggregateDs = ds + val aggregateDs : DataSet[(Int, Long)] = ds .sum(0) .andMax(1) // Ensure aggregate operator correctly copies other fields .filter(_._3 != null) .map{ t => (t._1, t._2) } - aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() + val result: Seq[(Int, Long)] = aggregateDs.collect - expected = "231,6\n" + assertEquals(1, result.size) + assertEquals((231, 6L), result.head) } @Test @@ -83,11 +66,10 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase( .filter(_._3 != null) .map { t => (t._2, t._1) } - aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - - env.execute() + val result : Seq[(Long, Int)] = aggregateDs.collect.sortWith((a, b) => a._1 < b._1) - expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n" + val expected : Seq[(Long, Int)] = Seq((1L, 1), (2L, 5), (3L, 15), (4L, 34), (5L, 65), (6L, 111)) + assertEquals(expected, result) } @Test @@ -96,18 +78,17 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase( val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get3TupleDataSet(env) - val aggregateDs = ds + val aggregateDs: DataSet[Int] = ds .groupBy(1) .min(0) .min(0) // Ensure aggregate operator correctly copies other fields .filter(_._3 != null) - .map { t => new Tuple1(t._1) } - - aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + .map { t => t._1 } - env.execute() + val result: Seq[Int] = aggregateDs.collect - expected = "1\n" + assertEquals(1, result.size) + assertEquals(Seq(1), result) } }
