[FLINK-1461][api-extending] Add SortPartition operator to Java and Scala APIs.

This closes #381


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d849703
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d849703
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d849703

Branch: refs/heads/master
Commit: 3d84970364ced41d1497269dc3c9d0b5835f9e1e
Parents: f0a28bf
Author: Fabian Hueske <[email protected]>
Authored: Tue Feb 10 18:35:13 2015 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Fri Feb 20 16:10:22 2015 +0100

----------------------------------------------------------------------
 docs/dataset_transformations.md                 |  35 +++
 docs/programming_guide.md                       |  26 ++
 .../org/apache/flink/compiler/PactCompiler.java |   5 +
 .../flink/compiler/dag/SortPartitionNode.java   | 127 ++++++++
 .../base/SortPartitionOperatorBase.java         |  88 ++++++
 .../java/org/apache/flink/api/java/DataSet.java |  31 +-
 .../flink/api/java/SortPartitionOperator.java   | 182 +++++++++++
 .../org/apache/flink/api/scala/DataSet.scala    |  23 +-
 .../javaApiOperators/SortPartitionITCase.java   | 305 +++++++++++++++++++
 .../util/CollectionDataSets.java                |   2 -
 10 files changed, 820 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index 223d24c..13082c1 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -1224,6 +1224,41 @@ val out = in.partitionByHash(0).mapPartition { ... }
 </div>
 </div>
 
+### Sort Partition
+
+Locally sorts all partitions of a DataSet on a specified field in a specified 
order.
+Fields can be specified as field expressions or field positions (see [Reduce 
examples](#reduce-on-grouped-dataset) for how to specify keys).
+Partitions can be sorted on multiple fields by chaining `sortPartition()` 
calls.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple2<String, Integer>> in = // [...]
+// Locally sort partitions in ascending order on the second String field and 
+// in descending order on the first String field.
+// Apply a MapPartition transformation on the sorted partitions.
+DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
+                                          .sortPartition(0, Order.DESCENDING)
+                                        .mapPartition(new PartitionMapper());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val in: DataSet[(String, Int)] = // [...]
+// Locally sort partitions in ascending order on the second String field and 
+// in descending order on the first String field.
+// Apply a MapPartition transformation on the sorted partitions.
+val out = in.sortPartition(1, Order.ASCENDING)
+              .sortPartition(0, Order.DESCENDING)
+            .mapPartition { ... }
+~~~
+
+</div>
+</div>
+
 ### First-n
 
 Returns the first n (arbitrary) elements of a DataSet. First-n can be applied 
on a regular DataSet, a grouped DataSet, or a grouped-sorted DataSet. Grouping 
keys can be specified as key-selector functions or field position keys (see 
[Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 3dcd770..efedc1b 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -653,6 +653,19 @@ DataSet<Integer> result = in.partitionByHash(0)
       </td>
     </tr>
     <tr>
+      <td><strong>Sort Partition</strong></td>
+      <td>
+        <p>Locally sorts all partitions of a data set on a specified field in 
a specified order. 
+          Fields can be specified as tuple positions or field expressions. 
+          Sorting on multiple fields is done by chaining sortPartition() 
calls.</p>
+{% highlight java %}
+DataSet<Tuple2<String,Integer>> in = // [...]
+DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING)
+                            .mapPartition(new PartitionMapper());
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
       <td><strong>First-n</strong></td>
       <td>
         <p>Returns the first n (arbitrary) elements of a data set. First-n can 
be applied on a regular data set, a grouped data set, or a grouped-sorted data 
set. Grouping keys can be specified as key-selector functions or field position 
keys.</p>
@@ -869,6 +882,19 @@ val result = in.partitionByHash(0).mapPartition { ... }
 {% endhighlight %}
       </td>
     </tr>
+    </tr>
+    <tr>
+      <td><strong>Sort Partition</strong></td>
+      <td>
+        <p>Locally sorts all partitions of a data set on a specified field in 
a specified order. 
+          Fields can be specified as tuple positions or field expressions. 
+          Sorting on multiple fields is done by chaining sortPartition() 
calls.</p>
+{% highlight scala %}
+val in: DataSet[(Int, String)] = // [...]
+val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
+{% endhighlight %}
+      </td>
+    </tr>
     <tr>
       <td><strong>First-n</strong></td>
       <td>

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 6d6bcc7..a82bb74 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.compiler.dag.SortPartitionNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -714,6 +716,9 @@ public class PactCompiler {
                        else if (c instanceof PartitionOperatorBase) {
                                n = new 
PartitionNode((PartitionOperatorBase<?>) c);
                        }
+                       else if (c instanceof SortPartitionOperatorBase) {
+                               n = new 
SortPartitionNode((SortPartitionOperatorBase<?>) c);
+                       }
                        else if (c instanceof PartialSolutionPlaceHolder) {
                                if (this.parent == null) {
                                        throw new InvalidProgramException("It 
is currently not supported to create data sinks inside iterations.");

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java
new file mode 100644
index 0000000..dc16f50
--- /dev/null
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.compiler.dag;
+
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.compiler.DataStatistics;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
+import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The optimizer's internal representation of a <i>SortPartition</i> operator 
node.
+ */
+public class SortPartitionNode extends SingleInputNode {
+
+       private final List<OperatorDescriptorSingle> possibleProperties;
+
+       public SortPartitionNode(SortPartitionOperatorBase<?> operator) {
+               super(operator);
+               
+               OperatorDescriptorSingle descr = new 
SortPartitionDescriptor(operator.getPartitionOrdering());
+               this.possibleProperties = Collections.singletonList(descr);
+       }
+
+       @Override
+       public SortPartitionOperatorBase<?> getPactContract() {
+               return (SortPartitionOperatorBase<?>) super.getPactContract();
+       }
+
+       @Override
+       public String getName() {
+               return "Sort-Partition";
+       }
+
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // sorting does not change the number of records
+               this.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords();
+               this.estimatedOutputSize = 
getPredecessorNode().getEstimatedOutputSize();
+       }
+       
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static class SortPartitionDescriptor extends 
OperatorDescriptorSingle {
+
+               private Ordering partitionOrder;
+
+               public SortPartitionDescriptor(Ordering partitionOrder) {
+                       this.partitionOrder = partitionOrder;
+               }
+               
+               @Override
+               public DriverStrategy getStrategy() {
+                       return DriverStrategy.UNARY_NO_OP;
+               }
+
+               @Override
+               public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
+                       return new SingleInputPlanNode(node, "Sort-Partition", 
in, DriverStrategy.UNARY_NO_OP);
+               }
+
+               @Override
+               protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+                       // sort partition does not require any global property
+                       return Collections.singletonList(new 
RequestedGlobalProperties());
+               }
+
+               @Override
+               protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+                       // set partition order as required local property
+                       RequestedLocalProperties rlp = new 
RequestedLocalProperties();
+                       rlp.setOrdering(this.partitionOrder);
+
+                       return Collections.singletonList(rlp);
+               }
+               
+               @Override
+               public GlobalProperties 
computeGlobalProperties(GlobalProperties gProps) {
+                       // sort partition is a no-operation operation, such 
that all global properties are preserved.
+                       return gProps;
+               }
+               
+               @Override
+               public LocalProperties computeLocalProperties(LocalProperties 
lProps) {
+                       // sort partition is a no-operation operation, such 
that all global properties are preserved.
+                       return lProps;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java
new file mode 100644
index 0000000..6fe237d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.NoOpFunction;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * @param <IN> The input and result type.
+ */
+public class SortPartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, 
NoOpFunction> {
+
+       private final Ordering partitionOrdering;
+
+
+       public SortPartitionOperatorBase(UnaryOperatorInformation<IN, IN> 
operatorInfo, Ordering partitionOrdering, String name) {
+               super(new UserCodeObjectWrapper<NoOpFunction>(new 
NoOpFunction()), operatorInfo, name);
+               this.partitionOrdering = partitionOrdering;
+       }
+
+       public Ordering getPartitionOrdering() {
+               return partitionOrdering;
+       }
+
+       @Override
+       public SingleInputSemanticProperties getSemanticProperties() {
+               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       protected List<IN> executeOnCollections(List<IN> inputData, 
RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
+
+               TypeInformation<IN> inputType = 
getInput().getOperatorInfo().getOutputType();
+
+               int[] sortColumns = this.partitionOrdering.getFieldPositions();
+               boolean[] sortOrderings = 
this.partitionOrdering.getFieldSortDirections();
+
+               final TypeComparator<IN> sortComparator;
+               if (inputType instanceof CompositeType) {
+                       sortComparator = ((CompositeType<IN>) 
inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig);
+               } else if (inputType instanceof AtomicType) {
+                       sortComparator = ((AtomicType) 
inputType).createComparator(sortOrderings[0], executionConfig);
+               } else {
+                       throw new UnsupportedOperationException("Partition 
sorting does not support type "+inputType+" yet.");
+               }
+
+               Collections.sort(inputData, new Comparator<IN>() {
+                       @Override
+                       public int compare(IN o1, IN o2) {
+                               return sortComparator.compare(o1, o2);
+                       }
+               });
+
+               return inputData;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index f2091e2..327a15a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
@@ -1086,7 +1087,35 @@ public abstract class DataSet<T> {
        public PartitionOperator<T> rebalance() {
                return new PartitionOperator<T>(this, 
PartitionMethod.REBALANCE, Utils.getCallLocationName());
        }
-               
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Sorting
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Locally sorts the partitions of the DataSet on the specified field 
in the specified order.
+        * DataSet can be sorted on multiple fields by chaining sortPartition() 
calls.
+        *
+        * @param field The field index on which the DataSet is sorted.
+        * @param order The order in which the DataSet is sorted.
+        * @return The DataSet with sorted local partitions.
+        */
+       public SortPartitionOperator<T> sortPartition(int field, Order order) {
+               return new SortPartitionOperator<T>(this, field, order, 
Utils.getCallLocationName());
+       }
+
+       /**
+        * Locally sorts the partitions of the DataSet on the specified field 
in the specified order.
+        * DataSet can be sorted on multiple fields by chaining sortPartition() 
calls.
+        *
+        * @param field The field expression referring to the field on which 
the DataSet is sorted.
+        * @param order The order in which the DataSet is sorted.
+        * @return The DataSet with sorted local partitions.
+        */
+       public SortPartitionOperator<T> sortPartition(String field, Order 
order) {
+               return new SortPartitionOperator<T>(this, field, order, 
Utils.getCallLocationName());
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Top-K
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java 
b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
new file mode 100644
index 0000000..7c09518
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.operators.SingleInputOperator;
+
+import java.util.Arrays;
+
+/**
+ * This operator represents a DataSet with locally sorted partitions.
+ *
+ * @param <T> The type of the DataSet with locally sorted partitions.
+ */
+public class SortPartitionOperator<T> extends SingleInputOperator<T, T, 
SortPartitionOperator<T>> {
+
+       private int[] sortKeyPositions;
+
+       private Order[] sortOrders;
+
+       private final String sortLocationName;
+
+
+       public SortPartitionOperator(DataSet<T> dataSet, int sortField, Order 
sortOrder, String sortLocationName) {
+               super(dataSet, dataSet.getType());
+               this.sortLocationName = sortLocationName;
+
+               int[] flatOrderKeys = getFlatFields(sortField);
+               this.appendSorting(flatOrderKeys, sortOrder);
+       }
+
+       public SortPartitionOperator(DataSet<T> dataSet, String sortField, 
Order sortOrder, String sortLocationName) {
+               super(dataSet, dataSet.getType());
+               this.sortLocationName = sortLocationName;
+
+               int[] flatOrderKeys = getFlatFields(sortField);
+               this.appendSorting(flatOrderKeys, sortOrder);
+       }
+
+       /**
+        * Appends an additional sort order with the specified field in the 
specified order to the
+        * local partition sorting of the DataSet.
+        *
+        * @param field The field index of the additional sort order of the 
local partition sorting.
+        * @param order The order of the additional sort order of the local 
partition sorting.
+        * @return The DataSet with sorted local partitions.
+        */
+       public SortPartitionOperator<T> sortPartition(int field, Order order) {
+
+               int[] flatOrderKeys = getFlatFields(field);
+               this.appendSorting(flatOrderKeys, order);
+               return this;
+       }
+
+       /**
+        * Appends an additional sort order with the specified field in the 
specified order to the
+        * local partition sorting of the DataSet.
+        *
+        * @param field The field expression referring to the field of the 
additional sort order of
+        *                 the local partition sorting.
+        * @param order The order  of the additional sort order of the local 
partition sorting.
+        * @return The DataSet with sorted local partitions.
+        */
+       public SortPartitionOperator<T> sortPartition(String field, Order 
order) {
+               int[] flatOrderKeys = getFlatFields(field);
+               this.appendSorting(flatOrderKeys, order);
+               return this;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Key Extraction
+       // 
--------------------------------------------------------------------------------------------
+
+       private int[] getFlatFields(int field) {
+
+               Keys.ExpressionKeys<T> ek;
+               try {
+                       ek = new Keys.ExpressionKeys<T>(new int[]{field}, 
super.getType());
+               } catch(IllegalArgumentException iae) {
+                       throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
+               }
+               return ek.computeLogicalKeyPositions();
+       }
+
+       private int[] getFlatFields(String fields) {
+
+               if(super.getType() instanceof CompositeType) {
+                       // compute flat field positions for (nested) sorting 
fields
+                       Keys.ExpressionKeys<T> ek;
+                       try {
+                               ek = new Keys.ExpressionKeys<T>(new 
String[]{fields}, super.getType());
+                       } catch(IllegalArgumentException iae) {
+                               throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
+                       }
+                       return ek.computeLogicalKeyPositions();
+               } else {
+
+                       fields = fields.trim();
+                       if (!(fields.equals("*") || fields.equals("_"))) {
+                               throw new InvalidProgramException("Output 
sorting of non-composite types can only be defined on the full type. " +
+                                               "Use a field wildcard for that 
(\"*\" or \"_\")");
+                       } else {
+                               return new int[]{0};
+                       }
+               }
+       }
+
+       private void appendSorting(int[] flatOrderFields, Order order) {
+
+               if(this.sortKeyPositions == null) {
+                       // set sorting info
+                       this.sortKeyPositions = flatOrderFields;
+                       this.sortOrders = new Order[flatOrderFields.length];
+                       Arrays.fill(this.sortOrders, order);
+               } else {
+                       // append sorting info to exising info
+                       int oldLength = this.sortKeyPositions.length;
+                       int newLength = oldLength + flatOrderFields.length;
+                       this.sortKeyPositions = 
Arrays.copyOf(this.sortKeyPositions, newLength);
+                       this.sortOrders = Arrays.copyOf(this.sortOrders, 
newLength);
+
+                       for(int i=0; i<flatOrderFields.length; i++) {
+                               this.sortKeyPositions[oldLength+i] = 
flatOrderFields[i];
+                               this.sortOrders[oldLength+i] = order;
+                       }
+               }
+       }
+
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Translation
+       // 
--------------------------------------------------------------------------------------------
+
+       protected org.apache.flink.api.common.operators.SingleInputOperator<?, 
T, ?> translateToDataFlow(Operator<T> input) {
+
+               String name = "Sort at " + sortLocationName;
+
+               Ordering partitionOrdering = new Ordering();
+               for (int i = 0; i < this.sortKeyPositions.length; i++) {
+                       
partitionOrdering.appendOrdering(this.sortKeyPositions[i], null, 
this.sortOrders[i]);
+               }
+
+               // distinguish between partition types
+               UnaryOperatorInformation<T, T> operatorInfo = new 
UnaryOperatorInformation<T, T>(getType(), getType());
+               SortPartitionOperatorBase<T> noop = new  
SortPartitionOperatorBase<T>(operatorInfo, partitionOrdering, name);
+               noop.setInput(input);
+               if(this.getParallelism() < 0) {
+                       // use parallelism of input if not explicitly specified
+                       
noop.setDegreeOfParallelism(input.getDegreeOfParallelism());
+               } else {
+                       // use explicitly specified parallelism
+                       noop.setDegreeOfParallelism(this.getParallelism());
+               }
+
+               return noop;
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index c1796ee..8a1dc41 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.aggregators.Aggregator
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
+import org.apache.flink.api.common.operators.Order
 import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
@@ -30,7 +31,7 @@ import 
org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
+import org.apache.flink.api.java.{DataSet => JavaDataSet, 
SortPartitionOperator}
 import org.apache.flink.api.scala.operators.{ScalaCsvOutputFormat, 
ScalaAggregateOperator}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.{FileSystem, Path}
@@ -1135,6 +1136,26 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   // 
--------------------------------------------------------------------------------------------
+  //  Partition Sorting
+  // 
--------------------------------------------------------------------------------------------
+
+  /**
+   * Locally sorts the partitions of the DataSet on the specified field in the 
specified order.
+   * The DataSet can be sorted on multiple fields by chaining sortPartition() 
calls.
+   */
+  def sortPartition(field: Int, order: Order): DataSet[T] = {
+    wrap (new SortPartitionOperator[T](javaSet, field, order, 
getCallLocationName()))
+  }
+
+  /**
+   * Locally sorts the partitions of the DataSet on the specified field in the 
specified order.
+   * The DataSet can be sorted on multiple fields by chaining sortPartition() 
calls.
+   */
+  def sortPartition(field: String, order: Order): DataSet[T] = {
+    wrap (new SortPartitionOperator[T](javaSet, field, order, 
getCallLocationName()))
+  }
+
+  // 
--------------------------------------------------------------------------------------------
   //  Result writing
   // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
new file mode 100644
index 0000000..790b7ba
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+@RunWith(Parameterized.class)
+public class SortPartitionITCase extends MultipleProgramsTestBase {
+
+       public SortPartitionITCase(ExecutionMode mode){
+               super(mode);
+       }
+
+       private String resultPath;
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testSortPartitionByKeyField() throws Exception {
+               /*
+                * Test sort partition on key field
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(4);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               ds
+                               .map(new IdMapper()).setParallelism(4) // 
parallelize input
+                               .sortPartition(1, Order.DESCENDING)
+                               .mapPartition(new 
OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
+                               .distinct()
+                               .writeAsText(resultPath);
+
+               env.execute();
+
+               expected = "(true)\n";
+       }
+
+       @Test
+       public void testSortPartitionByTwoKeyFields() throws Exception {
+               /*
+                * Test sort partition on two key fields
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(2);
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               ds
+                               .map(new IdMapper()).setParallelism(2) // 
parallelize input
+                               .sortPartition(4, Order.ASCENDING)
+                               .sortPartition(2, Order.DESCENDING)
+                               .mapPartition(new 
OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new 
Tuple5Checker()))
+                               .distinct()
+                               .writeAsText(resultPath);
+
+               env.execute();
+
+               expected = "(true)\n";
+       }
+
+       @Test
+       public void testSortPartitionByFieldExpression() throws Exception {
+               /*
+                * Test sort partition on field expression
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(4);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               ds
+                               .map(new IdMapper()).setParallelism(4) // 
parallelize input
+                               .sortPartition("f1", Order.DESCENDING)
+                               .mapPartition(new 
OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
+                               .distinct()
+                               .writeAsText(resultPath);
+
+               env.execute();
+
+               expected = "(true)\n";
+       }
+
+       @Test
+       public void testSortPartitionByTwoFieldExpressions() throws Exception {
+               /*
+                * Test sort partition on two field expressions
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(2);
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               ds
+                               .map(new IdMapper()).setParallelism(2) // 
parallelize input
+                               .sortPartition("f4", Order.ASCENDING)
+                               .sortPartition("f2", Order.DESCENDING)
+                               .mapPartition(new 
OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new 
Tuple5Checker()))
+                               .distinct()
+                               .writeAsText(resultPath);
+
+               env.execute();
+
+               expected = "(true)\n";
+       }
+
+       @Test
+       public void testSortPartitionByNestedFieldExpression() throws Exception 
{
+               /*
+                * Test sort partition on nested field expressions
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(3);
+
+               DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+               ds
+                               .map(new IdMapper()).setParallelism(3) // 
parallelize input
+                               .sortPartition("f0.f1", Order.ASCENDING)
+                               .sortPartition("f1", Order.DESCENDING)
+                               .mapPartition(new 
OrderCheckMapper<Tuple2<Tuple2<Integer, Integer>, String>>(new 
NestedTupleChecker()))
+                               .distinct()
+                               .writeAsText(resultPath);
+
+               env.execute();
+
+               expected = "(true)\n";
+       }
+
+       @Test
+       public void testSortPartitionPojoByNestedFieldExpression() throws 
Exception {
+               /*
+                * Test sort partition on field expression
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(3);
+
+               DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
+               ds
+                               .map(new IdMapper()).setParallelism(1) // 
parallelize input
+                               
.sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING)
+                               .sortPartition("number", Order.DESCENDING)
+                               .mapPartition(new OrderCheckMapper<POJO>(new 
PojoChecker()))
+                               .distinct()
+                               .writeAsText(resultPath);
+
+               env.execute();
+
+               expected = "(true)\n";
+       }
+
+       @Test
+       public void testSortPartitionDOPChange() throws Exception {
+               /*
+                * Test sort partition with DOP change
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(3);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               ds
+                               .sortPartition(1, 
Order.DESCENDING).setParallelism(3) // change DOP
+                               .mapPartition(new 
OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
+                               .distinct()
+                               .writeAsText(resultPath);
+
+               env.execute();
+
+               expected = "(true)\n";
+       }
+
+       public static interface OrderChecker<T> extends Serializable {
+
+               public boolean inOrder(T t1, T t2);
+       }
+
+       public static class Tuple3Checker implements 
OrderChecker<Tuple3<Integer, Long, String>> {
+               @Override
+               public boolean inOrder(Tuple3<Integer, Long, String> t1, 
Tuple3<Integer, Long, String> t2) {
+                       return t1.f1 >= t2.f1;
+               }
+       }
+
+       public static class Tuple5Checker implements 
OrderChecker<Tuple5<Integer, Long, Integer, String, Long>> {
+               @Override
+               public boolean inOrder(Tuple5<Integer, Long, Integer, String, 
Long> t1,
+                                                               Tuple5<Integer, 
Long, Integer, String, Long> t2) {
+                       return t1.f4 < t2.f4 || t1.f4 == t2.f4 && t1.f2 >= 
t2.f2;
+               }
+       }
+
+       public static class NestedTupleChecker implements 
OrderChecker<Tuple2<Tuple2<Integer, Integer>, String>> {
+               @Override
+               public boolean inOrder(Tuple2<Tuple2<Integer, Integer>, String> 
t1,
+                                                               
Tuple2<Tuple2<Integer, Integer>, String> t2) {
+                       return t1.f0.f1 < t2.f0.f1 ||
+                                       t1.f0.f1 == t2.f0.f1 && 
t1.f1.compareTo(t2.f1) >= 0;
+               }
+       }
+
+       public static class PojoChecker implements OrderChecker<POJO> {
+               @Override
+               public boolean inOrder(POJO t1,
+                                                          POJO t2) {
+                       return 
t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString)
 < 0 ||
+                                       
t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString)
 == 0 &&
+                                                       t1.number >= t2.number;
+               }
+       }
+
+       public static class OrderCheckMapper<T> implements 
MapPartitionFunction<T, Tuple1<Boolean>> {
+
+               OrderChecker<T> checker;
+
+               public OrderCheckMapper() {}
+
+               public OrderCheckMapper(OrderChecker<T> checker) {
+                       this.checker = checker;
+               }
+
+               @Override
+               public void mapPartition(Iterable<T> values, 
Collector<Tuple1<Boolean>> out) throws Exception {
+
+                       Iterator<T> it = values.iterator();
+                       if(!it.hasNext()) {
+                               out.collect(new Tuple1<Boolean>(true));
+                               return;
+                       } else {
+                               T last = it.next();
+
+                               while (it.hasNext()) {
+                                       T next = it.next();
+                                       if (!checker.inOrder(last, next)) {
+                                               out.collect(new 
Tuple1<Boolean>(false));
+                                               return;
+                                       }
+                                       last = next;
+                               }
+                               out.collect(new Tuple1<Boolean>(true));
+                       }
+               }
+       }
+
+
+       public static class IdMapper<T> implements MapFunction<T, T> {
+
+               @Override
+               public T map(T value) throws Exception {
+                       return value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index ef6b8a9..132d82f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -301,8 +301,6 @@ public class CollectionDataSets {
                public CustomType() {
                }
 
-               ;
-
                public CustomType(int i, long l, String s) {
                        myInt = i;
                        myLong = l;

Reply via email to