Repository: flink
Updated Branches:
  refs/heads/master 7abed950d -> 0ae0cba31


[scala-api] Corrects type cast in Scala API's DataSet.collect method and adds 
test cases for that


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ae0cba3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ae0cba3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ae0cba3

Branch: refs/heads/master
Commit: 0ae0cba319827e4e706d162a159dc4cc5bc2791e
Parents: 7abed95
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Mar 4 12:36:28 2015 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Mar 4 13:37:09 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/api/scala/DataSet.scala    |  6 +-
 .../flink/test/actions/CountCollectITCase.java  | 90 +++++++++++++++++++
 .../test/convenience/CountCollectITCase.java    | 92 --------------------
 .../api/scala/actions/CountCollectITCase.scala  | 73 ++++++++++++++++
 4 files changed, 167 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ae0cba3/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index b93b3f7..61bfbfa 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -40,6 +40,7 @@ import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.util.{AbstractID, Collector}
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.reflect.ClassTag
 
 
@@ -535,11 +536,12 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * @see org.apache.flink.api.java.Utils.CollectHelper
    */
   @throws(classOf[Exception])
-  def collect: List[T] = {
+  def collect: mutable.Buffer[T] = {
     val id = new AbstractID().toString
     javaSet.flatMap(new Utils.CollectHelper[T](id)).output(new 
DiscardingOutputFormat[T])
     val res = getExecutionEnvironment.execute()
-    res.getAccumulatorResult(id).asInstanceOf[List[T]]
+
+    res.getAccumulatorResult(id).asInstanceOf[java.util.List[T]].asScala
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae0cba3/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
new file mode 100644
index 0000000..0369a56
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.actions;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+
+public class CountCollectITCase {
+
+    @Test
+    public void testCountCollectOnSimpleJob() throws Exception {
+        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        env.setDegreeOfParallelism(5);
+        
+        Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+        
+        DataSet<Integer> data = env.fromElements(input);
+
+        // count
+        long numEntries = data.count();
+        assertEquals(10, numEntries);
+
+        // collect
+        ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
+        assertArrayEquals(input, list.toArray());
+
+    }
+    
+    @Test
+    public void testCountCollectOnAdvancedJob() throws Exception {
+        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        env.setDegreeOfParallelism(5);
+        env.getConfig().disableObjectReuse();
+
+
+        DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 
10);
+        DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 
10);
+
+        DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
+
+        // count
+        long numEntries = data3.count();
+        assertEquals(100, numEntries);
+
+        // collect
+        ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, 
Integer>>) data3.collect();
+        System.out.println(list);
+
+        // set expected entries in a hash map to true
+        HashMap<Tuple2<Integer, Integer>, Boolean> expected = new 
HashMap<Tuple2<Integer, Integer>, Boolean>();
+        for (int i = 1; i <= 10; i++) {
+            for (int j = 1; j <= 10; j++) {
+                expected.put(new Tuple2<Integer, Integer>(i, j), true);
+            }
+        }
+
+        // check if all entries are contained in the hash map
+        for (int i = 0; i < 100; i++) {
+            Tuple2<Integer, Integer> element = list.get(i);
+            assertEquals(expected.get(element), true);
+            expected.remove(element);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae0cba3/flink-tests/src/test/java/org/apache/flink/test/convenience/CountCollectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/convenience/CountCollectITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/convenience/CountCollectITCase.java
deleted file mode 100644
index a306c9a..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/convenience/CountCollectITCase.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.convenience;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.test.iterative.nephele.danglingpagerank.BooleanValue;
-import org.junit.Test;
-
-
-public class CountCollectITCase {
-
-    @Test
-    public void testSimple() throws Exception {
-        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-        env.setDegreeOfParallelism(5);
-        
-        Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
-        
-        DataSet<Integer> data = env.fromElements(input);
-
-        // count
-        long numEntries = data.count();
-        assertEquals(10, numEntries);
-
-        // collect
-        ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
-        assertArrayEquals(input, list.toArray());
-
-    }
-    
-    @Test
-    public void testAdvanced() throws Exception {
-        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-        env.setDegreeOfParallelism(5);
-        env.getConfig().disableObjectReuse();
-
-
-        DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 
10);
-        DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 
10);
-
-        DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
-
-        // count
-        long numEntries = data3.count();
-        assertEquals(100, numEntries);
-
-        // collect
-        ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, 
Integer>>) data3.collect();
-        System.out.println(list);
-
-        // set expected entries in a hash map to true
-        HashMap<Tuple2<Integer, Integer>, Boolean> expected = new 
HashMap<Tuple2<Integer, Integer>, Boolean>();
-        for (int i = 1; i <= 10; i++) {
-            for (int j = 1; j <= 10; j++) {
-                expected.put(new Tuple2<Integer, Integer>(i, j), true);
-            }
-        }
-
-        // check if all entries are contained in the hash map
-        for (int i = 0; i < 100; i++) {
-            Tuple2<Integer, Integer> element = list.get(i);
-            assertEquals(expected.get(element), true);
-            expected.remove(element);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae0cba3/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
new file mode 100644
index 0000000..de20bc9
--- /dev/null
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.actions
+
+import org.apache.flink.api.scala._
+
+import org.junit.Test
+import org.junit.Assert._
+
+class CountCollectITCase {
+
+  @Test
+  def testCountCollectOnSimpleJob: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    env.setDegreeOfParallelism(5)
+
+    val input = 1 to 10
+
+    val inputDS = env.fromElements(input: _*)
+
+    // count
+    val numEntries = inputDS.count
+    assertEquals(input.length, numEntries)
+
+    // collect
+    val list = inputDS.collect
+    assertArrayEquals(input.toArray, list.toArray)
+  }
+
+  @Test
+  def testCountCollectOnAdvancedJob: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    env.setDegreeOfParallelism(5)
+    env.getConfig.disableObjectReuse()
+
+    val input1 = 1 to 10
+    val input2 = 1 to 10
+
+    val inputDS1 = env.fromElements(input1:_*)
+    val inputDS2 = env.fromElements(input2:_*)
+
+    val result = inputDS1 cross inputDS2
+
+    val numEntries = result.count
+    assertEquals(input1.length * input2.length, numEntries)
+
+    val list = result.collect
+
+    val marker = Array.fill(input1.length, input2.length)(false)
+
+    for((x,y) <- list) {
+      assertFalse(s"Element ($x,$y) seen twice.", marker(x-1)(y-1))
+      marker(x-1)(y-1) = true
+    }
+  }
+
+}

Reply via email to