Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1585#discussion_r51870368
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
---
@@ -59,6 +62,14 @@ public SortPartitionOperator(DataSet<T> dataSet, String
sortField, Order sortOrd
this.appendSorting(flatOrderKeys, sortOrder);
}
+ public <K> SortPartitionOperator(DataSet<T> dataSet,
Keys.SelectorFunctionKeys<T, K> sortKey, Order sortOrder, String
sortLocationName) {
+ super(dataSet, dataSet.getType());
+ this.sortLocationName = sortLocationName;
+
+ int[] flatOrderKeys = getFlatFields(sortKey);
--- End diff --
This does not work.
`SelectorFunctionKeys` need to be handled completely different. In short,
you need to inject a MapFunction<IN, Tuple2<KEY, IN>> that extracts the key and
emits a Tuple2 with the extracted key and the original input record.
The data set is then sorted on the first tuple field (the key field) and
the records are unwrapped after the sort by another `MapFunction<Tuple2<KEY,
IN>, IN>`. Have a look at how the PartitionOperator handles KeySelectors.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---