[
https://issues.apache.org/jira/browse/FLINK-6070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Konstantin Knauf updated FLINK-6070:
------------------------------------
Labels: (was: auto-closed)
Priority: Not a Priority (was: Minor)
> Suggestion: add ComparableTuple types
> -------------------------------------
>
> Key: FLINK-6070
> URL: https://issues.apache.org/jira/browse/FLINK-6070
> Project: Flink
> Issue Type: Improvement
> Components: API / Type Serialization System
> Affects Versions: 1.2.0
> Reporter: Luke Hutchison
> Priority: Not a Priority
>
> Since Java doesn't have built-in tuple types, I find myself using Flink
> tuples for a lot of tasks in Flink programs. One downside is that these
> tuples are not inherently comparable, so when you want to sort a collection
> of tuples, you have to provide a custom comparator.
> I created a tuple sorting class, as follows. (Only the methods for Tuple2 are
> defined at the bottom, similar methods could be added for other tuple types.)
> I wanted to get feedback on whether something like this would be considered
> useful for Flink before I submitted a PR.
> {code}
> import java.util.ArrayList;
> import java.util.Collection;
> import java.util.Collections;
> import java.util.Comparator;
> import java.util.List;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> /** A class for sorting collections of tuples. */
> public class TupleSorter {
> /** Produce a Tuple comparator for the given number of fields, with the
> requested field priority and sort order. */
> private static Comparator<Tuple> newComparator(final int tupleLen, final
> int[] fieldPriority,
> final int[] sortDescendingIndices) {
> if (fieldPriority == null || fieldPriority.length != tupleLen) {
> throw new IllegalArgumentException("Invalid sort order");
> }
> boolean[] idxUsed = new boolean[tupleLen];
> for (int i = 0; i < fieldPriority.length; i++) {
> int idx = fieldPriority[i];
> if (idx < 0 || idx >= tupleLen) {
> throw new IllegalArgumentException("fieldPriority entry out
> of range: " + idx);
> }
> if (idxUsed[idx]) {
> throw new IllegalArgumentException("fieldPriority entry
> duplicated: " + idx);
> }
> idxUsed[idx] = true;
> }
> boolean[] sortDescending = new boolean[tupleLen];
> for (int i = 0; i < sortDescendingIndices.length; i++) {
> int idx = sortDescendingIndices[i];
> if (idx < 0 || idx >= tupleLen) {
> throw new IllegalArgumentException("sortDescendingIndices
> entry out of range: " + idx);
> }
> sortDescending[idx] = true;
> }
> return (tuple0, tuple1) -> {
> for (int i = 0; i < tupleLen; i++) {
> int idx = fieldPriority[i];
> @SuppressWarnings({ "rawtypes", "unchecked" })
> int diff = ((Comparable)
> tuple0.getField(idx)).compareTo((Comparable) tuple1.getField(idx));
> if (sortDescending[i]) {
> diff = -diff;
> }
> if (diff != 0) {
> return diff;
> }
> }
> return 0;
> };
> }
> /**
> * Sort a list of tuples.
> *
> * @param list
> * The list of tuples.
> * @param fieldPriority
> * The sort priority for the fields (primary an secondary sort
> key): a permutation of the field indices 0
> * and 1. The default sort order within a field is ascending.
> * @param sortDescendingIndices
> * If provided, inverts the sort order for a given field index
> from ascending to descending order.
> */
> public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> void
> sort(final List<Tuple2<T0, T1>> list,
> final int[] fieldPriority, final int... sortDescendingIndices) {
> list.sort(newComparator(/* tupleLen = */ 2, fieldPriority,
> sortDescendingIndices));
> }
> /**
> * Produce a sorted copy of a collection of tuples.
> *
> * @param list
> * The list of tuples.
> * @param fieldPriority
> * The sort priority for the fields (primary an secondary sort
> key): a permutation of the field indices 0
> * and 1. The default sort order within a field is ascending.
> * @param sortDescendingIndices
> * If provided, inverts the sort order for a given field index
> from ascending to descending order.
> */
> public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>>
> ArrayList<Tuple2<T0, T1>> sortCopy(
> final Collection<Tuple2<T0, T1>> collection, final int[]
> fieldPriority,
> final int... sortDescendingIndices) {
> ArrayList<Tuple2<T0, T1>> list = new ArrayList<>(collection);
> Collections.sort(list, newComparator(/* tupleLen = */ 2,
> fieldPriority, sortDescendingIndices));
> return list;
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)