Repository: flink
Updated Branches:
  refs/heads/master 6f0faf9bb -> 5c83e787c


[FLINK-4804] [py] Fix first() failing when applied to groupings


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c83e787
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c83e787
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c83e787

Branch: refs/heads/master
Commit: 5c83e787c7a4edafa3db34e5a58548728cc27b6c
Parents: 41d5167
Author: zentol <[email protected]>
Authored: Wed Oct 12 12:19:04 2016 +0200
Committer: zentol <[email protected]>
Committed: Fri Oct 21 11:03:03 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/python/api/PythonPlanBinder.java  | 17 +++++++++++++++--
 .../org/apache/flink/python/api/test_main2.py      |  4 ++++
 2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c83e787/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index d55b9d4..cc63ef4 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -459,9 +459,22 @@ public class PythonPlanBinder {
                                .map(new 
KeyDiscarder()).setParallelism(getParallelism(info)).name("DistinctPostStep"));
        }
 
+       @SuppressWarnings("unchecked")
        private void createFirstOperation(PythonOperationInfo info) throws 
IOException {
-               DataSet op = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, 
op.first(info.count).setParallelism(getParallelism(info)).name("First"));
+               Object op = sets.get(info.parentID);
+               if (op instanceof DataSet) {
+                       sets.put(info.setID, ((DataSet) 
op).first(info.count).setParallelism(getParallelism(info)).name("First"));
+                       return;
+               }
+               if (op instanceof UnsortedGrouping) {
+                       sets.put(info.setID, ((UnsortedGrouping) 
op).first(info.count).setParallelism(getParallelism(info)).name("First")
+                               .map(new 
KeyDiscarder()).setParallelism(getParallelism(info)).name("FirstPostStep"));
+                       return;
+               }
+               if (op instanceof SortedGrouping) {
+                       sets.put(info.setID, ((SortedGrouping) 
op).first(info.count).setParallelism(getParallelism(info)).name("First")
+                               .map(new 
KeyDiscarder()).setParallelism(getParallelism(info)).name("FirstPostStep"));
+               }
        }
 
        private void createGroupOperation(PythonOperationInfo info) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c83e787/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
index f1d40e1..25b9d29 100644
--- 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
@@ -126,6 +126,10 @@ if __name__ == "__main__":
     d1 \
         .first(1) \
         .map_partition(Verify([1], "First")).output()
+    d4 \
+        .group_by(0) \
+        .first(1) \
+        .map_partition(Verify([(1, 0.5, "hello", True), (2, 0.4, "world", 
False)], "Grouped First")).output()
     d1 \
         .rebalance()
     d6 \

Reply via email to