Repository: flink Updated Branches: refs/heads/master 7abed950d -> 0ae0cba31
[scala-api] Corrects type cast in Scala API's DataSet.collect method and adds test cases for that Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ae0cba3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ae0cba3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ae0cba3 Branch: refs/heads/master Commit: 0ae0cba319827e4e706d162a159dc4cc5bc2791e Parents: 7abed95 Author: Till Rohrmann <trohrm...@apache.org> Authored: Wed Mar 4 12:36:28 2015 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Mar 4 13:37:09 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/api/scala/DataSet.scala | 6 +- .../flink/test/actions/CountCollectITCase.java | 90 +++++++++++++++++++ .../test/convenience/CountCollectITCase.java | 92 -------------------- .../api/scala/actions/CountCollectITCase.scala | 73 ++++++++++++++++ 4 files changed, 167 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0ae0cba3/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index b93b3f7..61bfbfa 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -40,6 +40,7 @@ import org.apache.flink.core.fs.{FileSystem, Path} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.util.{AbstractID, Collector} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.reflect.ClassTag @@ -535,11 +536,12 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * @see org.apache.flink.api.java.Utils.CollectHelper */ @throws(classOf[Exception]) - def collect: List[T] = { + def collect: mutable.Buffer[T] = { val id = new AbstractID().toString javaSet.flatMap(new Utils.CollectHelper[T](id)).output(new DiscardingOutputFormat[T]) val res = getExecutionEnvironment.execute() - res.getAccumulatorResult(id).asInstanceOf[List[T]] + + res.getAccumulatorResult(id).asInstanceOf[java.util.List[T]].asScala } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0ae0cba3/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java new file mode 100644 index 0000000..0369a56 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.actions; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.ArrayList; +import java.util.HashMap; + +import static org.junit.Assert.*; + +import org.junit.Test; + + +public class CountCollectITCase { + + @Test + public void testCountCollectOnSimpleJob() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(5); + + Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + + DataSet<Integer> data = env.fromElements(input); + + // count + long numEntries = data.count(); + assertEquals(10, numEntries); + + // collect + ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); + assertArrayEquals(input, list.toArray()); + + } + + @Test + public void testCountCollectOnAdvancedJob() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(5); + env.getConfig().disableObjectReuse(); + + + DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + + DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); + + // count + long numEntries = data3.count(); + assertEquals(100, numEntries); + + // collect + ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, Integer>>) data3.collect(); + System.out.println(list); + + // set expected entries in a hash map to true + HashMap<Tuple2<Integer, Integer>, Boolean> expected = new HashMap<Tuple2<Integer, Integer>, Boolean>(); + for (int i = 1; i <= 10; i++) { + for (int j = 1; j <= 10; j++) { + expected.put(new Tuple2<Integer, Integer>(i, j), true); + } + } + + // check if all entries are contained in the hash map + for (int i = 0; i < 100; i++) { + Tuple2<Integer, Integer> element = list.get(i); + assertEquals(expected.get(element), true); + expected.remove(element); + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0ae0cba3/flink-tests/src/test/java/org/apache/flink/test/convenience/CountCollectITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/convenience/CountCollectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/convenience/CountCollectITCase.java deleted file mode 100644 index a306c9a..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/convenience/CountCollectITCase.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.convenience; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; - -import static org.junit.Assert.*; - -import org.apache.flink.test.iterative.nephele.danglingpagerank.BooleanValue; -import org.junit.Test; - - -public class CountCollectITCase { - - @Test - public void testSimple() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(5); - - Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; - - DataSet<Integer> data = env.fromElements(input); - - // count - long numEntries = data.count(); - assertEquals(10, numEntries); - - // collect - ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); - assertArrayEquals(input, list.toArray()); - - } - - @Test - public void testAdvanced() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(5); - env.getConfig().disableObjectReuse(); - - - DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - - DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); - - // count - long numEntries = data3.count(); - assertEquals(100, numEntries); - - // collect - ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, Integer>>) data3.collect(); - System.out.println(list); - - // set expected entries in a hash map to true - HashMap<Tuple2<Integer, Integer>, Boolean> expected = new HashMap<Tuple2<Integer, Integer>, Boolean>(); - for (int i = 1; i <= 10; i++) { - for (int j = 1; j <= 10; j++) { - expected.put(new Tuple2<Integer, Integer>(i, j), true); - } - } - - // check if all entries are contained in the hash map - for (int i = 0; i < 100; i++) { - Tuple2<Integer, Integer> element = list.get(i); - assertEquals(expected.get(element), true); - expected.remove(element); - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0ae0cba3/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala new file mode 100644 index 0000000..de20bc9 --- /dev/null +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.actions + +import org.apache.flink.api.scala._ + +import org.junit.Test +import org.junit.Assert._ + +class CountCollectITCase { + + @Test + def testCountCollectOnSimpleJob: Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + env.setDegreeOfParallelism(5) + + val input = 1 to 10 + + val inputDS = env.fromElements(input: _*) + + // count + val numEntries = inputDS.count + assertEquals(input.length, numEntries) + + // collect + val list = inputDS.collect + assertArrayEquals(input.toArray, list.toArray) + } + + @Test + def testCountCollectOnAdvancedJob: Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + env.setDegreeOfParallelism(5) + env.getConfig.disableObjectReuse() + + val input1 = 1 to 10 + val input2 = 1 to 10 + + val inputDS1 = env.fromElements(input1:_*) + val inputDS2 = env.fromElements(input2:_*) + + val result = inputDS1 cross inputDS2 + + val numEntries = result.count + assertEquals(input1.length * input2.length, numEntries) + + val list = result.collect + + val marker = Array.fill(input1.length, input2.length)(false) + + for((x,y) <- list) { + assertFalse(s"Element ($x,$y) seen twice.", marker(x-1)(y-1)) + marker(x-1)(y-1) = true + } + } + +}