[FLINK-1847] [Scala API] Changes the return type of Scala API's collect method to Seq[T] and adds parentheses to collect and count methods.
- Updates existing test cases using Scala API's collect/count method. This closes #583 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e45f13f5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e45f13f5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e45f13f5 Branch: refs/heads/master Commit: e45f13f5351c1550fc4413c534effe213ddeb059 Parents: 8740d1e Author: Till Rohrmann <[email protected]> Authored: Thu Apr 9 12:26:53 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Apr 9 21:02:13 2015 +0200 ---------------------------------------------------------------------- .../src/main/scala/org/apache/flink/api/scala/DataSet.scala | 6 +++--- .../src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala | 2 +- .../org/apache/flink/ml/classification/CoCoASuite.scala | 2 +- .../org/apache/flink/ml/feature/PolynomialBaseITSuite.scala | 6 +++--- .../org/apache/flink/ml/recommendation/ALSITSuite.scala | 4 ++-- .../ml/regression/MultipleLinearRegressionITSuite.scala | 8 ++++---- .../apache/flink/api/scala/actions/CountCollectITCase.scala | 4 ++-- 7 files changed, 16 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e45f13f5/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index de07a57..00b019b 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -520,7 +520,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * @see org.apache.flink.api.java.Utils.CountHelper */ @throws(classOf[Exception]) - def count: Long = { + def count(): Long = { val id = new AbstractID().toString javaSet.flatMap(new CountHelper[T](id)).output(new DiscardingOutputFormat[lang.Long]) val res = getExecutionEnvironment.execute() @@ -531,12 +531,12 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * Convenience method to get the elements of a DataSet as a List * As DataSet can contain a lot of data, this method should be used with caution. * - * @return A List containing the elements of the DataSet + * @return A Seq containing the elements of the DataSet * * @see org.apache.flink.api.java.Utils.CollectHelper */ @throws(classOf[Exception]) - def collect: mutable.Buffer[T] = { + def collect(): Seq[T] = { val id = new AbstractID().toString javaSet.flatMap(new Utils.CollectHelper[T](id)).output(new DiscardingOutputFormat[T]) val res = getExecutionEnvironment.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/e45f13f5/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala index 47682ce..1464d07 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala @@ -57,7 +57,7 @@ class MLUtilsSuite extends FlatSpec with Matchers with FlinkTestBase { val svmInput = env.readLibSVM(inputFilePath) - val labeledVectors = svmInput.collect + val labeledVectors = svmInput.collect() labeledVectors.size should be(expectedLabeledVectors.size) http://git-wip-us.apache.org/repos/asf/flink/blob/e45f13f5/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala index 35a69f1..0f000a3 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala @@ -42,7 +42,7 @@ class CoCoAITSuite extends FlatSpec with Matchers with FlinkTestBase { val model = learner.fit(trainingDS) - val weightVector = model.weights.collect(0) + val weightVector = model.weights.collect().apply(0) weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach { case (weight, expectedWeight) => http://git-wip-us.apache.org/repos/asf/flink/blob/e45f13f5/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala index 0b321ff..b930ceb 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala @@ -55,7 +55,7 @@ class PolynomialBaseITSuite (2.0 -> DenseVector (8.0, 4.0, 2.0) ) ) toMap - val result = transformedDS.collect + val result = transformedDS.collect() for (entry <- result) { expectedMap.contains (entry.label) should be (true) @@ -86,7 +86,7 @@ class PolynomialBaseITSuite val transformedDS = transformer.transform(inputDS) - val result = transformedDS.collect + val result = transformedDS.collect() for(entry <- result) { expectedMap.contains(entry.label) should be(true) @@ -111,7 +111,7 @@ class PolynomialBaseITSuite val transformedDS = transformer.transform(inputDS) - val result = transformedDS.collect + val result = transformedDS.collect() val expectedMap = List( (1.0 -> DenseVector()), http://git-wip-us.apache.org/repos/asf/flink/blob/e45f13f5/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala index db5ad6e..245d7a8 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala @@ -54,7 +54,7 @@ class ALSITSuite case (userID, itemID, rating) => (userID, itemID) }) - val predictions = model.transform(testData).collect + val predictions = model.transform(testData).collect() predictions.length should equal(expectedResult.length) @@ -70,7 +70,7 @@ class ALSITSuite } } - val risk = model.empiricalRisk(inputDS).collect(0) + val risk = model.empiricalRisk(inputDS).collect().apply(0) risk should be(expectedEmpiricalRisk +- 1) } http://git-wip-us.apache.org/repos/asf/flink/blob/e45f13f5/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala index 9389ae7..2d3f770 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala @@ -51,7 +51,7 @@ class MultipleLinearRegressionITSuite val inputDS = env.fromCollection(data) val model = learner.fit(inputDS, parameters) - val weightList = model.weights.collect + val weightList = model.weights.collect() weightList.size should equal(1) @@ -63,7 +63,7 @@ class MultipleLinearRegressionITSuite } weight0 should be (expectedWeight0 +- 0.4) - val srs = model.squaredResidualSum(inputDS).collect(0) + val srs = model.squaredResidualSum(inputDS).collect().apply(0) srs should be (expectedSquaredResidualSum +- 2) } @@ -87,7 +87,7 @@ class MultipleLinearRegressionITSuite val model = pipeline.fit(inputDS, parameters) - val weightList = model.weights.collect + val weightList = model.weights.collect() weightList.size should equal(1) @@ -102,7 +102,7 @@ class MultipleLinearRegressionITSuite val transformedInput = polynomialBase.transform(inputDS, parameters) - val srs = model.squaredResidualSum(transformedInput).collect(0) + val srs = model.squaredResidualSum(transformedInput).collect().apply(0) srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5) } http://git-wip-us.apache.org/repos/asf/flink/blob/e45f13f5/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala index 6e95b7b..f9c744d 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala @@ -43,7 +43,7 @@ class CountCollectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa assertEquals(input.length, numEntries) // collect - val list = inputDS.collect + val list = inputDS.collect() assertArrayEquals(input.toArray, list.toArray) } @@ -63,7 +63,7 @@ class CountCollectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa val numEntries = result.count assertEquals(input1.length * input2.length, numEntries) - val list = result.collect + val list = result.collect() val marker = Array.fill(input1.length, input2.length)(false)
