Repository: flink Updated Branches: refs/heads/master 6f0faf9bb -> 5c83e787c
[FLINK-4804] [py] Fix first() failing when applied to groupings Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c83e787 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c83e787 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c83e787 Branch: refs/heads/master Commit: 5c83e787c7a4edafa3db34e5a58548728cc27b6c Parents: 41d5167 Author: zentol <[email protected]> Authored: Wed Oct 12 12:19:04 2016 +0200 Committer: zentol <[email protected]> Committed: Fri Oct 21 11:03:03 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/python/api/PythonPlanBinder.java | 17 +++++++++++++++-- .../org/apache/flink/python/api/test_main2.py | 4 ++++ 2 files changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5c83e787/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index d55b9d4..cc63ef4 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -459,9 +459,22 @@ public class PythonPlanBinder { .map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("DistinctPostStep")); } + @SuppressWarnings("unchecked") private void createFirstOperation(PythonOperationInfo info) throws IOException { - DataSet op = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op.first(info.count).setParallelism(getParallelism(info)).name("First")); + Object op = sets.get(info.parentID); + if (op instanceof DataSet) { + sets.put(info.setID, ((DataSet) op).first(info.count).setParallelism(getParallelism(info)).name("First")); + return; + } + if (op instanceof UnsortedGrouping) { + sets.put(info.setID, ((UnsortedGrouping) op).first(info.count).setParallelism(getParallelism(info)).name("First") + .map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("FirstPostStep")); + return; + } + if (op instanceof SortedGrouping) { + sets.put(info.setID, ((SortedGrouping) op).first(info.count).setParallelism(getParallelism(info)).name("First") + .map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("FirstPostStep")); + } } private void createGroupOperation(PythonOperationInfo info) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/5c83e787/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py index f1d40e1..25b9d29 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -126,6 +126,10 @@ if __name__ == "__main__": d1 \ .first(1) \ .map_partition(Verify([1], "First")).output() + d4 \ + .group_by(0) \ + .first(1) \ + .map_partition(Verify([(1, 0.5, "hello", True), (2, 0.4, "world", False)], "Grouped First")).output() d1 \ .rebalance() d6 \
