[FLINK-1461][api-extending] Add SortPartition operator to Java and Scala APIs.
This closes #381 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d849703 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d849703 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d849703 Branch: refs/heads/master Commit: 3d84970364ced41d1497269dc3c9d0b5835f9e1e Parents: f0a28bf Author: Fabian Hueske <[email protected]> Authored: Tue Feb 10 18:35:13 2015 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Feb 20 16:10:22 2015 +0100 ---------------------------------------------------------------------- docs/dataset_transformations.md | 35 +++ docs/programming_guide.md | 26 ++ .../org/apache/flink/compiler/PactCompiler.java | 5 + .../flink/compiler/dag/SortPartitionNode.java | 127 ++++++++ .../base/SortPartitionOperatorBase.java | 88 ++++++ .../java/org/apache/flink/api/java/DataSet.java | 31 +- .../flink/api/java/SortPartitionOperator.java | 182 +++++++++++ .../org/apache/flink/api/scala/DataSet.scala | 23 +- .../javaApiOperators/SortPartitionITCase.java | 305 +++++++++++++++++++ .../util/CollectionDataSets.java | 2 - 10 files changed, 820 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/docs/dataset_transformations.md ---------------------------------------------------------------------- diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md index 223d24c..13082c1 100644 --- a/docs/dataset_transformations.md +++ b/docs/dataset_transformations.md @@ -1224,6 +1224,41 @@ val out = in.partitionByHash(0).mapPartition { ... } </div> </div> +### Sort Partition + +Locally sorts all partitions of a DataSet on a specified field in a specified order. +Fields can be specified as field expressions or field positions (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys). +Partitions can be sorted on multiple fields by chaining `sortPartition()` calls. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +~~~java +DataSet<Tuple2<String, Integer>> in = // [...] +// Locally sort partitions in ascending order on the second String field and +// in descending order on the first String field. +// Apply a MapPartition transformation on the sorted partitions. +DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING) + .sortPartition(0, Order.DESCENDING) + .mapPartition(new PartitionMapper()); +~~~ + +</div> +<div data-lang="scala" markdown="1"> + +~~~scala +val in: DataSet[(String, Int)] = // [...] +// Locally sort partitions in ascending order on the second String field and +// in descending order on the first String field. +// Apply a MapPartition transformation on the sorted partitions. +val out = in.sortPartition(1, Order.ASCENDING) + .sortPartition(0, Order.DESCENDING) + .mapPartition { ... } +~~~ + +</div> +</div> + ### First-n Returns the first n (arbitrary) elements of a DataSet. First-n can be applied on a regular DataSet, a grouped DataSet, or a grouped-sorted DataSet. Grouping keys can be specified as key-selector functions or field position keys (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys). http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/docs/programming_guide.md ---------------------------------------------------------------------- diff --git a/docs/programming_guide.md b/docs/programming_guide.md index 3dcd770..efedc1b 100644 --- a/docs/programming_guide.md +++ b/docs/programming_guide.md @@ -653,6 +653,19 @@ DataSet<Integer> result = in.partitionByHash(0) </td> </tr> <tr> + <td><strong>Sort Partition</strong></td> + <td> + <p>Locally sorts all partitions of a data set on a specified field in a specified order. + Fields can be specified as tuple positions or field expressions. + Sorting on multiple fields is done by chaining sortPartition() calls.</p> +{% highlight java %} +DataSet<Tuple2<String,Integer>> in = // [...] +DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING) + .mapPartition(new PartitionMapper()); +{% endhighlight %} + </td> + </tr> + <tr> <td><strong>First-n</strong></td> <td> <p>Returns the first n (arbitrary) elements of a data set. First-n can be applied on a regular data set, a grouped data set, or a grouped-sorted data set. Grouping keys can be specified as key-selector functions or field position keys.</p> @@ -869,6 +882,19 @@ val result = in.partitionByHash(0).mapPartition { ... } {% endhighlight %} </td> </tr> + </tr> + <tr> + <td><strong>Sort Partition</strong></td> + <td> + <p>Locally sorts all partitions of a data set on a specified field in a specified order. + Fields can be specified as tuple positions or field expressions. + Sorting on multiple fields is done by chaining sortPartition() calls.</p> +{% highlight scala %} +val in: DataSet[(Int, String)] = // [...] +val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... } +{% endhighlight %} + </td> + </tr> <tr> <td><strong>First-n</strong></td> <td> http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java index 6d6bcc7..a82bb74 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase; +import org.apache.flink.compiler.dag.SortPartitionNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.InvalidProgramException; @@ -714,6 +716,9 @@ public class PactCompiler { else if (c instanceof PartitionOperatorBase) { n = new PartitionNode((PartitionOperatorBase<?>) c); } + else if (c instanceof SortPartitionOperatorBase) { + n = new SortPartitionNode((SortPartitionOperatorBase<?>) c); + } else if (c instanceof PartialSolutionPlaceHolder) { if (this.parent == null) { throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations."); http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java new file mode 100644 index 0000000..dc16f50 --- /dev/null +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.compiler.dag; + +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase; +import org.apache.flink.compiler.DataStatistics; +import org.apache.flink.compiler.dataproperties.GlobalProperties; +import org.apache.flink.compiler.dataproperties.LocalProperties; +import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties; +import org.apache.flink.compiler.dataproperties.RequestedLocalProperties; +import org.apache.flink.compiler.operators.OperatorDescriptorSingle; +import org.apache.flink.compiler.plan.Channel; +import org.apache.flink.compiler.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +/** + * The optimizer's internal representation of a <i>SortPartition</i> operator node. + */ +public class SortPartitionNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> possibleProperties; + + public SortPartitionNode(SortPartitionOperatorBase<?> operator) { + super(operator); + + OperatorDescriptorSingle descr = new SortPartitionDescriptor(operator.getPartitionOrdering()); + this.possibleProperties = Collections.singletonList(descr); + } + + @Override + public SortPartitionOperatorBase<?> getPactContract() { + return (SortPartitionOperatorBase<?>) super.getPactContract(); + } + + @Override + public String getName() { + return "Sort-Partition"; + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.possibleProperties; + } + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // sorting does not change the number of records + this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize(); + } + + @Override + public SemanticProperties getSemanticProperties() { + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); + } + + // -------------------------------------------------------------------------------------------- + + public static class SortPartitionDescriptor extends OperatorDescriptorSingle { + + private Ordering partitionOrder; + + public SortPartitionDescriptor(Ordering partitionOrder) { + this.partitionOrder = partitionOrder; + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.UNARY_NO_OP; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "Sort-Partition", in, DriverStrategy.UNARY_NO_OP); + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + // sort partition does not require any global property + return Collections.singletonList(new RequestedGlobalProperties()); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + // set partition order as required local property + RequestedLocalProperties rlp = new RequestedLocalProperties(); + rlp.setOrdering(this.partitionOrder); + + return Collections.singletonList(rlp); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + // sort partition is a no-operation operation, such that all global properties are preserved. + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + // sort partition is a no-operation operation, such that all global properties are preserved. + return lProps; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java new file mode 100644 index 0000000..6fe237d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.NoOpFunction; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputOperator; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * @param <IN> The input and result type. + */ +public class SortPartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpFunction> { + + private final Ordering partitionOrdering; + + + public SortPartitionOperatorBase(UnaryOperatorInformation<IN, IN> operatorInfo, Ordering partitionOrdering, String name) { + super(new UserCodeObjectWrapper<NoOpFunction>(new NoOpFunction()), operatorInfo, name); + this.partitionOrdering = partitionOrdering; + } + + public Ordering getPartitionOrdering() { + return partitionOrdering; + } + + @Override + public SingleInputSemanticProperties getSemanticProperties() { + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); + } + + // -------------------------------------------------------------------------------------------- + + @Override + protected List<IN> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) { + + TypeInformation<IN> inputType = getInput().getOperatorInfo().getOutputType(); + + int[] sortColumns = this.partitionOrdering.getFieldPositions(); + boolean[] sortOrderings = this.partitionOrdering.getFieldSortDirections(); + + final TypeComparator<IN> sortComparator; + if (inputType instanceof CompositeType) { + sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig); + } else if (inputType instanceof AtomicType) { + sortComparator = ((AtomicType) inputType).createComparator(sortOrderings[0], executionConfig); + } else { + throw new UnsupportedOperationException("Partition sorting does not support type "+inputType+" yet."); + } + + Collections.sort(inputData, new Comparator<IN>() { + @Override + public int compare(IN o1, IN o2) { + return sortComparator.compare(o1, o2); + } + }); + + return inputData; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index f2091e2..327a15a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; @@ -1086,7 +1087,35 @@ public abstract class DataSet<T> { public PartitionOperator<T> rebalance() { return new PartitionOperator<T>(this, PartitionMethod.REBALANCE, Utils.getCallLocationName()); } - + + // -------------------------------------------------------------------------------------------- + // Sorting + // -------------------------------------------------------------------------------------------- + + /** + * Locally sorts the partitions of the DataSet on the specified field in the specified order. + * DataSet can be sorted on multiple fields by chaining sortPartition() calls. + * + * @param field The field index on which the DataSet is sorted. + * @param order The order in which the DataSet is sorted. + * @return The DataSet with sorted local partitions. + */ + public SortPartitionOperator<T> sortPartition(int field, Order order) { + return new SortPartitionOperator<T>(this, field, order, Utils.getCallLocationName()); + } + + /** + * Locally sorts the partitions of the DataSet on the specified field in the specified order. + * DataSet can be sorted on multiple fields by chaining sortPartition() calls. + * + * @param field The field expression referring to the field on which the DataSet is sorted. + * @param order The order in which the DataSet is sorted. + * @return The DataSet with sorted local partitions. + */ + public SortPartitionOperator<T> sortPartition(String field, Order order) { + return new SortPartitionOperator<T>(this, field, order, Utils.getCallLocationName()); + } + // -------------------------------------------------------------------------------------------- // Top-K // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java new file mode 100644 index 0000000..7c09518 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.java.operators.SingleInputOperator; + +import java.util.Arrays; + +/** + * This operator represents a DataSet with locally sorted partitions. + * + * @param <T> The type of the DataSet with locally sorted partitions. + */ +public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPartitionOperator<T>> { + + private int[] sortKeyPositions; + + private Order[] sortOrders; + + private final String sortLocationName; + + + public SortPartitionOperator(DataSet<T> dataSet, int sortField, Order sortOrder, String sortLocationName) { + super(dataSet, dataSet.getType()); + this.sortLocationName = sortLocationName; + + int[] flatOrderKeys = getFlatFields(sortField); + this.appendSorting(flatOrderKeys, sortOrder); + } + + public SortPartitionOperator(DataSet<T> dataSet, String sortField, Order sortOrder, String sortLocationName) { + super(dataSet, dataSet.getType()); + this.sortLocationName = sortLocationName; + + int[] flatOrderKeys = getFlatFields(sortField); + this.appendSorting(flatOrderKeys, sortOrder); + } + + /** + * Appends an additional sort order with the specified field in the specified order to the + * local partition sorting of the DataSet. + * + * @param field The field index of the additional sort order of the local partition sorting. + * @param order The order of the additional sort order of the local partition sorting. + * @return The DataSet with sorted local partitions. + */ + public SortPartitionOperator<T> sortPartition(int field, Order order) { + + int[] flatOrderKeys = getFlatFields(field); + this.appendSorting(flatOrderKeys, order); + return this; + } + + /** + * Appends an additional sort order with the specified field in the specified order to the + * local partition sorting of the DataSet. + * + * @param field The field expression referring to the field of the additional sort order of + * the local partition sorting. + * @param order The order of the additional sort order of the local partition sorting. + * @return The DataSet with sorted local partitions. + */ + public SortPartitionOperator<T> sortPartition(String field, Order order) { + int[] flatOrderKeys = getFlatFields(field); + this.appendSorting(flatOrderKeys, order); + return this; + } + + // -------------------------------------------------------------------------------------------- + // Key Extraction + // -------------------------------------------------------------------------------------------- + + private int[] getFlatFields(int field) { + + Keys.ExpressionKeys<T> ek; + try { + ek = new Keys.ExpressionKeys<T>(new int[]{field}, super.getType()); + } catch(IllegalArgumentException iae) { + throw new InvalidProgramException("Invalid specification of field expression.", iae); + } + return ek.computeLogicalKeyPositions(); + } + + private int[] getFlatFields(String fields) { + + if(super.getType() instanceof CompositeType) { + // compute flat field positions for (nested) sorting fields + Keys.ExpressionKeys<T> ek; + try { + ek = new Keys.ExpressionKeys<T>(new String[]{fields}, super.getType()); + } catch(IllegalArgumentException iae) { + throw new InvalidProgramException("Invalid specification of field expression.", iae); + } + return ek.computeLogicalKeyPositions(); + } else { + + fields = fields.trim(); + if (!(fields.equals("*") || fields.equals("_"))) { + throw new InvalidProgramException("Output sorting of non-composite types can only be defined on the full type. " + + "Use a field wildcard for that (\"*\" or \"_\")"); + } else { + return new int[]{0}; + } + } + } + + private void appendSorting(int[] flatOrderFields, Order order) { + + if(this.sortKeyPositions == null) { + // set sorting info + this.sortKeyPositions = flatOrderFields; + this.sortOrders = new Order[flatOrderFields.length]; + Arrays.fill(this.sortOrders, order); + } else { + // append sorting info to exising info + int oldLength = this.sortKeyPositions.length; + int newLength = oldLength + flatOrderFields.length; + this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength); + this.sortOrders = Arrays.copyOf(this.sortOrders, newLength); + + for(int i=0; i<flatOrderFields.length; i++) { + this.sortKeyPositions[oldLength+i] = flatOrderFields[i]; + this.sortOrders[oldLength+i] = order; + } + } + } + + + // -------------------------------------------------------------------------------------------- + // Translation + // -------------------------------------------------------------------------------------------- + + protected org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateToDataFlow(Operator<T> input) { + + String name = "Sort at " + sortLocationName; + + Ordering partitionOrdering = new Ordering(); + for (int i = 0; i < this.sortKeyPositions.length; i++) { + partitionOrdering.appendOrdering(this.sortKeyPositions[i], null, this.sortOrders[i]); + } + + // distinguish between partition types + UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getType(), getType()); + SortPartitionOperatorBase<T> noop = new SortPartitionOperatorBase<T>(operatorInfo, partitionOrdering, name); + noop.setInput(input); + if(this.getParallelism() < 0) { + // use parallelism of input if not explicitly specified + noop.setDegreeOfParallelism(input.getDegreeOfParallelism()); + } else { + // use explicitly specified parallelism + noop.setDegreeOfParallelism(this.getParallelism()); + } + + return noop; + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index c1796ee..8a1dc41 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException import org.apache.flink.api.common.aggregators.Aggregator import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat} +import org.apache.flink.api.common.operators.Order import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod import org.apache.flink.api.java.aggregation.Aggregations import org.apache.flink.api.java.functions.{FirstReducer, KeySelector} @@ -30,7 +31,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint import org.apache.flink.api.java.operators.Keys.ExpressionKeys import org.apache.flink.api.java.operators._ -import org.apache.flink.api.java.{DataSet => JavaDataSet} +import org.apache.flink.api.java.{DataSet => JavaDataSet, SortPartitionOperator} import org.apache.flink.api.scala.operators.{ScalaCsvOutputFormat, ScalaAggregateOperator} import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.{FileSystem, Path} @@ -1135,6 +1136,26 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { } // -------------------------------------------------------------------------------------------- + // Partition Sorting + // -------------------------------------------------------------------------------------------- + + /** + * Locally sorts the partitions of the DataSet on the specified field in the specified order. + * The DataSet can be sorted on multiple fields by chaining sortPartition() calls. + */ + def sortPartition(field: Int, order: Order): DataSet[T] = { + wrap (new SortPartitionOperator[T](javaSet, field, order, getCallLocationName())) + } + + /** + * Locally sorts the partitions of the DataSet on the specified field in the specified order. + * The DataSet can be sorted on multiple fields by chaining sortPartition() calls. + */ + def sortPartition(field: String, order: Order): DataSet[T] = { + wrap (new SortPartitionOperator[T](javaSet, field, order, getCallLocationName())) + } + + // -------------------------------------------------------------------------------------------- // Result writing // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java new file mode 100644 index 0000000..790b7ba --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.Serializable; +import java.util.Iterator; + +@RunWith(Parameterized.class) +public class SortPartitionITCase extends MultipleProgramsTestBase { + + public SortPartitionITCase(ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testSortPartitionByKeyField() throws Exception { + /* + * Test sort partition on key field + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(4); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + ds + .map(new IdMapper()).setParallelism(4) // parallelize input + .sortPartition(1, Order.DESCENDING) + .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) + .distinct() + .writeAsText(resultPath); + + env.execute(); + + expected = "(true)\n"; + } + + @Test + public void testSortPartitionByTwoKeyFields() throws Exception { + /* + * Test sort partition on two key fields + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(2); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + ds + .map(new IdMapper()).setParallelism(2) // parallelize input + .sortPartition(4, Order.ASCENDING) + .sortPartition(2, Order.DESCENDING) + .mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new Tuple5Checker())) + .distinct() + .writeAsText(resultPath); + + env.execute(); + + expected = "(true)\n"; + } + + @Test + public void testSortPartitionByFieldExpression() throws Exception { + /* + * Test sort partition on field expression + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(4); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + ds + .map(new IdMapper()).setParallelism(4) // parallelize input + .sortPartition("f1", Order.DESCENDING) + .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) + .distinct() + .writeAsText(resultPath); + + env.execute(); + + expected = "(true)\n"; + } + + @Test + public void testSortPartitionByTwoFieldExpressions() throws Exception { + /* + * Test sort partition on two field expressions + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(2); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + ds + .map(new IdMapper()).setParallelism(2) // parallelize input + .sortPartition("f4", Order.ASCENDING) + .sortPartition("f2", Order.DESCENDING) + .mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new Tuple5Checker())) + .distinct() + .writeAsText(resultPath); + + env.execute(); + + expected = "(true)\n"; + } + + @Test + public void testSortPartitionByNestedFieldExpression() throws Exception { + /* + * Test sort partition on nested field expressions + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(3); + + DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env); + ds + .map(new IdMapper()).setParallelism(3) // parallelize input + .sortPartition("f0.f1", Order.ASCENDING) + .sortPartition("f1", Order.DESCENDING) + .mapPartition(new OrderCheckMapper<Tuple2<Tuple2<Integer, Integer>, String>>(new NestedTupleChecker())) + .distinct() + .writeAsText(resultPath); + + env.execute(); + + expected = "(true)\n"; + } + + @Test + public void testSortPartitionPojoByNestedFieldExpression() throws Exception { + /* + * Test sort partition on field expression + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(3); + + DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env); + ds + .map(new IdMapper()).setParallelism(1) // parallelize input + .sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING) + .sortPartition("number", Order.DESCENDING) + .mapPartition(new OrderCheckMapper<POJO>(new PojoChecker())) + .distinct() + .writeAsText(resultPath); + + env.execute(); + + expected = "(true)\n"; + } + + @Test + public void testSortPartitionDOPChange() throws Exception { + /* + * Test sort partition with DOP change + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(3); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + ds + .sortPartition(1, Order.DESCENDING).setParallelism(3) // change DOP + .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) + .distinct() + .writeAsText(resultPath); + + env.execute(); + + expected = "(true)\n"; + } + + public static interface OrderChecker<T> extends Serializable { + + public boolean inOrder(T t1, T t2); + } + + public static class Tuple3Checker implements OrderChecker<Tuple3<Integer, Long, String>> { + @Override + public boolean inOrder(Tuple3<Integer, Long, String> t1, Tuple3<Integer, Long, String> t2) { + return t1.f1 >= t2.f1; + } + } + + public static class Tuple5Checker implements OrderChecker<Tuple5<Integer, Long, Integer, String, Long>> { + @Override + public boolean inOrder(Tuple5<Integer, Long, Integer, String, Long> t1, + Tuple5<Integer, Long, Integer, String, Long> t2) { + return t1.f4 < t2.f4 || t1.f4 == t2.f4 && t1.f2 >= t2.f2; + } + } + + public static class NestedTupleChecker implements OrderChecker<Tuple2<Tuple2<Integer, Integer>, String>> { + @Override + public boolean inOrder(Tuple2<Tuple2<Integer, Integer>, String> t1, + Tuple2<Tuple2<Integer, Integer>, String> t2) { + return t1.f0.f1 < t2.f0.f1 || + t1.f0.f1 == t2.f0.f1 && t1.f1.compareTo(t2.f1) >= 0; + } + } + + public static class PojoChecker implements OrderChecker<POJO> { + @Override + public boolean inOrder(POJO t1, + POJO t2) { + return t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) < 0 || + t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) == 0 && + t1.number >= t2.number; + } + } + + public static class OrderCheckMapper<T> implements MapPartitionFunction<T, Tuple1<Boolean>> { + + OrderChecker<T> checker; + + public OrderCheckMapper() {} + + public OrderCheckMapper(OrderChecker<T> checker) { + this.checker = checker; + } + + @Override + public void mapPartition(Iterable<T> values, Collector<Tuple1<Boolean>> out) throws Exception { + + Iterator<T> it = values.iterator(); + if(!it.hasNext()) { + out.collect(new Tuple1<Boolean>(true)); + return; + } else { + T last = it.next(); + + while (it.hasNext()) { + T next = it.next(); + if (!checker.inOrder(last, next)) { + out.collect(new Tuple1<Boolean>(false)); + return; + } + last = next; + } + out.collect(new Tuple1<Boolean>(true)); + } + } + } + + + public static class IdMapper<T> implements MapFunction<T, T> { + + @Override + public T map(T value) throws Exception { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index ef6b8a9..132d82f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -301,8 +301,6 @@ public class CollectionDataSets { public CustomType() { } - ; - public CustomType(int i, long l, String s) { myInt = i; myLong = l;
