Repository: flink
Updated Branches:
  refs/heads/master c56e3f10b -> bed3da4a6


[FLINK-1444][api-extending] Add support for attaching data properties to data 
sources

This closes #379


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0a28bf5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0a28bf5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0a28bf5

Branch: refs/heads/master
Commit: f0a28bf5345084a0a43df16021e60078e322e087
Parents: c56e3f1
Author: Fabian Hueske <[email protected]>
Authored: Fri Feb 6 14:28:00 2015 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Fri Feb 20 16:10:01 2015 +0100

----------------------------------------------------------------------
 .../flink/compiler/dag/DataSourceNode.java      |  82 +-
 .../flink/compiler/plan/SourcePlanNode.java     |  12 +-
 .../flink/compiler/PropertyDataSourceTest.java  | 897 +++++++++++++++++++
 .../common/operators/GenericDataSourceBase.java |  41 +-
 .../flink/api/java/io/SplitDataProperties.java  | 464 ++++++++++
 .../flink/api/java/operators/DataSource.java    |  30 +-
 6 files changed, 1517 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
index af2a92b..49946e0 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
@@ -24,19 +24,25 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.NonParallelInput;
 import org.apache.flink.api.common.io.ReplicatingInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import 
org.apache.flink.api.common.operators.GenericDataSourceBase.SplitDataProperties;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.costs.CostEstimator;
 import org.apache.flink.compiler.costs.Costs;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
 import org.apache.flink.compiler.plan.PlanNode;
 import org.apache.flink.compiler.plan.SourcePlanNode;
 import org.apache.flink.configuration.Configuration;
@@ -51,6 +57,10 @@ public class DataSourceNode extends OptimizerNode {
 
        private final boolean replicatedInput;
 
+       private GlobalProperties gprops;
+
+       private LocalProperties lprops;
+
        /**
         * Creates a new DataSourceNode for the given contract.
         * 
@@ -76,6 +86,20 @@ public class DataSourceNode extends OptimizerNode {
                } else {
                        this.replicatedInput = false;
                }
+
+               this.gprops = new GlobalProperties();
+               this.lprops = new LocalProperties();
+
+               SplitDataProperties<?> splitProps = 
pactContract.getSplitDataProperties();
+
+               if(replicatedInput) {
+                       this.gprops.setFullyReplicated();
+                       this.lprops = new LocalProperties();
+               } else if (splitProps != null) {
+                       // configure data properties of data source using split 
properties
+                       setDataPropertiesFromSplitProperties(splitProps);
+               }
+
        }
 
        /**
@@ -184,7 +208,8 @@ public class DataSourceNode extends OptimizerNode {
                        return this.cachedPlans;
                }
 
-               SourcePlanNode candidate = new SourcePlanNode(this, "DataSource 
("+this.getPactContract().getName()+")");
+               SourcePlanNode candidate = new SourcePlanNode(this, "DataSource 
("+this.getPactContract().getName()+")",
+                               this.gprops, this.lprops);
 
                if(!replicatedInput) {
                        
candidate.updatePropertiesWithUniqueSets(getUniqueFields());
@@ -205,8 +230,6 @@ public class DataSourceNode extends OptimizerNode {
                                
estimator.addFileInputCost(this.estimatedOutputSize * 
this.getDegreeOfParallelism(), costs);
                        }
                        candidate.setCosts(costs);
-
-                       candidate.getGlobalProperties().setFullyReplicated();
                }
 
                // since there is only a single plan for the data-source, 
return a list with that element only
@@ -228,4 +251,57 @@ public class DataSourceNode extends OptimizerNode {
                        visitor.postVisit(this);
                }
        }
+
+       private void setDataPropertiesFromSplitProperties(SplitDataProperties 
splitProps) {
+
+               // set global properties
+               int[] partitionKeys = splitProps.getSplitPartitionKeys();
+               Partitioner<?> partitioner = splitProps.getSplitPartitioner();
+
+               if(partitionKeys != null && partitioner != null) {
+                       this.gprops.setCustomPartitioned(new 
FieldList(partitionKeys), partitioner);
+               }
+               else if(partitionKeys != null) {
+                       this.gprops.setAnyPartitioning(new 
FieldList(partitionKeys));
+               }
+               // set local properties
+               int[] groupingKeys = splitProps.getSplitGroupKeys();
+               Ordering ordering = splitProps.getSplitOrder();
+
+               // more than one split per source tasks possible.
+               // adapt split grouping and sorting
+               if(ordering != null) {
+
+                       // sorting falls back to grouping because a source can 
read multiple,
+                       // randomly assigned splits
+                       groupingKeys = ordering.getFieldPositions();
+               }
+
+               if(groupingKeys != null && partitionKeys != null) {
+                       // check if grouping is also valid across splits, i.e., 
whether grouping keys are
+                       // valid superset of partition keys
+                       boolean allFieldsIncluded = true;
+                       for(int i : partitionKeys) {
+                               boolean fieldIncluded = false;
+                               for(int j : groupingKeys) {
+                                       if(i == j) {
+                                               fieldIncluded = true;
+                                               break;
+                                       }
+                               }
+                               if(!fieldIncluded) {
+                                       allFieldsIncluded = false;
+                                       break;
+                               }
+                       }
+                       if (allFieldsIncluded) {
+                               this.lprops = LocalProperties.forGrouping(new 
FieldList(groupingKeys));
+                       } else {
+                               this.lprops = new LocalProperties();
+                       }
+
+               } else {
+                       this.lprops = new LocalProperties();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java
index 891345d..813feb3 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java
@@ -44,13 +44,17 @@ public class SourcePlanNode extends PlanNode {
         * @param template The template optimizer node that this candidate is 
created for.
         */
        public SourcePlanNode(DataSourceNode template, String nodeName) {
+               this(template, nodeName, new GlobalProperties(), new 
LocalProperties());
+       }
+
+       public SourcePlanNode(DataSourceNode template, String nodeName, 
GlobalProperties gprops, LocalProperties lprops) {
                super(template, nodeName, DriverStrategy.NONE);
-               
-               this.globalProps = new GlobalProperties();
-               this.localProps = new LocalProperties();
+
+               this.globalProps = gprops;
+               this.localProps = lprops;
                updatePropertiesWithUniqueSets(template.getUniqueFields());
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        
        public DataSourceNode getDataSourceNode() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java
new file mode 100644
index 0000000..7b023e5
--- /dev/null
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java
@@ -0,0 +1,897 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.plan.NAryUnionPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plan.SourcePlanNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@SuppressWarnings({"serial"})
+public class PropertyDataSourceTest extends CompilerTestBase {
+
+       private List<Tuple3<Long, SomePojo, String>> tuple3PojoData = new 
ArrayList<Tuple3<Long, SomePojo, String>>();
+       private TupleTypeInfo<Tuple3<Long, SomePojo, String>> tuple3PojoType = 
new TupleTypeInfo<Tuple3<Long, SomePojo, String>>(
+                       BasicTypeInfo.LONG_TYPE_INFO,
+                       TypeExtractor.createTypeInfo(SomePojo.class),
+                       BasicTypeInfo.STRING_TYPE_INFO
+       );
+
+       @Test
+       public void checkSinglePartitionedSource1() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource2() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(1, 0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource3() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("*");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1, 
2, 3, 4)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource4() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1, 2, 
3)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource5() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1.stringField");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(3)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource6() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1.intField; f2");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2, 4)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource7() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("byDate", 1, 0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING);
+               Assert.assertTrue(gprops.getCustomPartitioner() != null);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedGroupedSource1() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0)
+                               .splitsGroupedBy(0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedGroupedSource2() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0)
+                               .splitsGroupedBy(1, 0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0, 1)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedGroupedSource3() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(1)
+                               .splitsGroupedBy(0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedGroupedSource4() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0, 1)
+                               .splitsGroupedBy(0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedGroupedSource5() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f2")
+                               .splitsGroupedBy("f2");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(4)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(4)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedGroupedSource6() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1.intField")
+                               .splitsGroupedBy("f0; f1.intField");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedGroupedSource7() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1.intField")
+                               .splitsGroupedBy("f1");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedGroupedSource8() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1")
+                               .splitsGroupedBy("f1.stringField");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new 
FieldSet(1,2,3)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedOrderedSource1() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(1)
+                               .splitsOrderedBy(new int[]{1}, new 
Order[]{Order.ASCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue((new 
FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedOrderedSource2() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(1)
+                               .splitsOrderedBy(new int[]{1, 0}, new 
Order[]{Order.ASCENDING, Order.DESCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue((new 
FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1, 0)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedOrderedSource3() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0)
+                               .splitsOrderedBy(new int[]{1}, new 
Order[]{Order.ASCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedOrderedSource4() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0, 1)
+                               .splitsOrderedBy(new int[]{1}, new 
Order[]{Order.DESCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedOrderedSource5() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                       .splitsPartitionedBy("f1.intField")
+                       .splitsOrderedBy("f0; f1.intField", new 
Order[]{Order.ASCENDING, Order.DESCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedOrderedSource6() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1.intField")
+                               .splitsOrderedBy("f1", new 
Order[]{Order.DESCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedOrderedSource7() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1")
+                               .splitsOrderedBy("f1.stringField", new 
Order[]{Order.ASCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new 
FieldSet(1,2,3)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkCoPartitionedSources1() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data1 =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data1.getSplitDataProperties()
+                               .splitsPartitionedBy("byDate", 0);
+
+               DataSource<Tuple2<Long, String>> data2 =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data2.getSplitDataProperties()
+                               .splitsPartitionedBy("byDate", 0);
+
+               data1.union(data2).print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode1 = (SourcePlanNode) 
((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
+               SourcePlanNode sourceNode2 = (SourcePlanNode) 
((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
+
+               GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
+               LocalProperties lprops1 = sourceNode1.getLocalProperties();
+               GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
+               LocalProperties lprops2 = sourceNode2.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops1.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING);
+               Assert.assertTrue(lprops1.getGroupedFields() == null);
+               Assert.assertTrue(lprops1.getOrdering() == null);
+
+               Assert.assertTrue((new 
FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops2.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING);
+               Assert.assertTrue(lprops2.getGroupedFields() == null);
+               Assert.assertTrue(lprops2.getOrdering() == null);
+
+               
Assert.assertTrue(gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
+       }
+
+       @Test
+       public void checkCoPartitionedSources2() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data1 =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data1.getSplitDataProperties()
+                               .splitsPartitionedBy("byCountry", 0);
+
+               DataSource<Tuple2<Long, String>> data2 =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data2.getSplitDataProperties()
+                               .splitsPartitionedBy("byDate", 0);
+
+               data1.union(data2).print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode1 = (SourcePlanNode) 
((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
+               SourcePlanNode sourceNode2 = (SourcePlanNode) 
((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
+
+               GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
+               LocalProperties lprops1 = sourceNode1.getLocalProperties();
+               GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
+               LocalProperties lprops2 = sourceNode2.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops1.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING);
+               Assert.assertTrue(lprops1.getGroupedFields() == null);
+               Assert.assertTrue(lprops1.getOrdering() == null);
+
+               Assert.assertTrue((new 
FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops2.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING);
+               Assert.assertTrue(lprops2.getGroupedFields() == null);
+               Assert.assertTrue(lprops2.getOrdering() == null);
+
+               
Assert.assertTrue(!gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
+       }
+
+
+       public static class SomePojo {
+               public double doubleField;
+               public int intField;
+               public String stringField;
+       }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
index 13c5dad..912d13d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -45,6 +46,8 @@ public class GenericDataSourceBase<OUT, T extends 
InputFormat<OUT, ?>> extends O
 
        protected String statisticsKey;
 
+       private SplitDataProperties splitProperties;
+
        /**
         * Creates a new instance for the given file using the given input 
format.
         *
@@ -157,7 +160,30 @@ public class GenericDataSourceBase<OUT, T extends 
InputFormat<OUT, ?>> extends O
        public void setStatisticsKey(String statisticsKey) {
                this.statisticsKey = statisticsKey;
        }
-       
+
+       /**
+        * Sets properties of input splits for this data source.
+        * Split properties can help to generate more efficient execution plans.
+        * <br>
+        * <b>
+        *     IMPORTANT: Providing wrong split data properties can cause wrong 
results!
+        * </b>
+        *
+        * @param splitDataProperties The data properties of this data source's 
splits.
+        */
+       public void setSplitDataProperties(SplitDataProperties<OUT> 
splitDataProperties) {
+               this.splitProperties = splitDataProperties;
+       }
+
+       /**
+        * Returns the data properties of this data source's splits.
+        *
+        * @return The data properties of this data source's splits or null if 
no properties have been set.
+        */
+       public SplitDataProperties<OUT> getSplitDataProperties() {
+               return this.splitProperties;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        
        /**
@@ -209,4 +235,17 @@ public class GenericDataSourceBase<OUT, T extends 
InputFormat<OUT, ?>> extends O
        public String toString() {
                return this.name;
        }
+
+
+       public static interface SplitDataProperties<T> {
+
+               public int[] getSplitPartitionKeys();
+
+               public Partitioner<T> getSplitPartitioner();
+
+               public int[] getSplitGroupKeys();
+
+               public Ordering getSplitOrder();
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
new file mode 100644
index 0000000..04d9953
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.Keys;
+
+import java.util.Arrays;
+
+/**
+ * SplitDataProperties define data properties on {@link 
org.apache.flink.core.io.InputSplit}
+ * generated by the {@link org.apache.flink.api.common.io.InputFormat} of a 
{@link DataSource}.
+ *
+ * InputSplits are units of input which are distributed among and assigned to 
parallel data source subtasks.
+ * SplitDataProperties can define that the elements which are generated by the 
associated InputFormat
+ * are
+ * <ul>
+ *   <li>Partitioned on one or more fields across InputSplits, i.e., all 
elements with the same
+ *   (combination of) key(s) are located in the same input split.</li>
+ *   <li>Grouped on one or more fields within an InputSplit, i.e., all 
elements of an input split
+ *   that have the same (combination of) key(s) are emitted in a single 
sequence one after the other.</li>
+ *   <li>Ordered on one or more fields within an InputSplit, i.e., all 
elements within an input split
+ *    are in the defined order.</li>
+ * </ul>
+ *
+ * <b>IMPORTANT: SplitDataProperties can improve the execution of a program 
because certain
+ * data reorganization steps such as shuffling or sorting can be avoided.
+ * HOWEVER, if SplitDataProperties are not correctly defined, the result of 
the program might be wrong!</b>
+ *
+ * @param <T> The type of the DataSource on which the SplitDataProperties are 
defined.
+ *
+ * @see org.apache.flink.core.io.InputSplit
+ * @see org.apache.flink.api.common.io.InputFormat
+ * @see org.apache.flink.api.java.operators.DataSource
+ */
+public class SplitDataProperties<T> implements 
GenericDataSourceBase.SplitDataProperties<T> {
+
+       private TypeInformation<T> type;
+
+       private int[] splitPartitionKeys;
+
+       private Partitioner<T> splitPartitioner;
+
+       private int[] splitGroupKeys;
+
+       private Ordering splitOrdering;
+
+       /**
+        * Creates SplitDataProperties for the given data types.
+        *
+        * @param type The data type of the SplitDataProperties.
+        */
+       public SplitDataProperties(TypeInformation<T> type) {
+               this.type = type;
+       }
+
+       /**
+        * Creates SplitDataProperties for the given data types.
+        *
+        * @param source The DataSource for which the SplitDataProperties are 
created.
+        */
+       public SplitDataProperties(DataSource<T> source) {
+               this.type = source.getType();
+       }
+
+       /**
+        * Defines that data is partitioned across input splits on the fields 
defined by field positions.
+        * All records sharing the same key (combination) must be contained in 
a single input split.
+        * <br>
+        * <b>
+        *     IMPORTANT: Providing wrong information with SplitDataProperties 
can cause wrong results!
+        * </b>
+        *
+        * @param partitionFields The field positions of the partitioning keys.
+        * @result This SplitDataProperties object.
+        */
+       public SplitDataProperties<T> splitsPartitionedBy(int... 
partitionFields) {
+               return this.splitsPartitionedBy(null, partitionFields);
+       }
+
+       /**
+        * Defines that data is partitioned using a specific partitioning method
+        * across input splits on the fields defined by field positions.
+        * All records sharing the same key (combination) must be contained in 
a single input split.
+        * <br>
+        * <b>
+        *     IMPORTANT: Providing wrong information with SplitDataProperties 
can cause wrong results!
+        * </b>
+        *
+        * @param partitionMethodId An ID for the method that was used to 
partition the data across splits.
+        * @param partitionFields The field positions of the partitioning keys.
+        * @result This SplitDataProperties object.
+        */
+       public SplitDataProperties<T> splitsPartitionedBy(String 
partitionMethodId, int... partitionFields) {
+
+               if (partitionFields == null) {
+                       throw new InvalidProgramException("PartitionFields may 
not be null.");
+               } else if (partitionFields.length == 0) {
+                       throw new InvalidProgramException("PartitionFields may 
not be empty.");
+               }
+
+               this.splitPartitionKeys = getAllFlatKeys(partitionFields);
+               if (partitionMethodId != null) {
+                       this.splitPartitioner = new 
SourcePartitionerMarker<T>(partitionMethodId);
+               } else {
+                       this.splitPartitioner = null;
+               }
+
+               return this;
+       }
+
+       /**
+        * Defines that data is partitioned across input splits on the fields 
defined by field expressions.
+        * Multiple field expressions must be separated by the semicolon ';' 
character.
+        * All records sharing the same key (combination) must be contained in 
a single input split.
+        * <br>
+        * <b>
+        *     IMPORTANT: Providing wrong information with SplitDataProperties 
can cause wrong results!
+        * </b>
+        *
+        * @param partitionFields The field expressions of the partitioning 
keys.
+        * @result This SplitDataProperties object.
+        */
+       public SplitDataProperties<T> splitsPartitionedBy(String 
partitionFields) {
+               return this.splitsPartitionedBy(null, partitionFields);
+       }
+
+       /**
+        * Defines that data is partitioned using an identifiable method
+        * across input splits on the fields defined by field expressions.
+        * Multiple field expressions must be separated by the semicolon ';' 
character.
+        * All records sharing the same key (combination) must be contained in 
a single input split.
+        * <br>
+        * <b>
+        *     IMPORTANT: Providing wrong information with SplitDataProperties 
can cause wrong results!
+        * </b>
+        *
+        * @param partitionMethodId An ID for the method that was used to 
partition the data across splits.
+        * @param partitionFields The field expressions of the partitioning 
keys.
+        * @result This SplitDataProperties object.
+        */
+       public SplitDataProperties<T> splitsPartitionedBy(String 
partitionMethodId, String partitionFields) {
+
+               if(partitionFields == null) {
+                       throw new InvalidProgramException("PartitionFields may 
not be null.");
+               }
+
+               String[] partitionKeysA = partitionFields.split(";");
+               if (partitionKeysA.length == 0) {
+                       throw new InvalidProgramException("PartitionFields may 
not be empty.");
+               }
+
+               this.splitPartitionKeys = getAllFlatKeys(partitionKeysA);
+               if(partitionMethodId != null) {
+                       this.splitPartitioner = new 
SourcePartitionerMarker<T>(partitionMethodId);
+               }
+               else {
+                       this.splitPartitioner = null;
+               }
+
+               return this;
+       }
+
+       /**
+        * Defines that the data within an input split is grouped on the fields 
defined by the field positions.
+        * All records sharing the same key (combination) must be subsequently 
emitted by the input
+        * format for each input split.
+        * <br>
+        * <b>
+        *     IMPORTANT: Providing wrong information with SplitDataProperties 
can cause wrong results!
+        * </b>
+        *
+        * @param groupFields The field positions of the grouping keys.
+        * @result This SplitDataProperties object.
+        */
+       public SplitDataProperties<T> splitsGroupedBy(int... groupFields) {
+
+               if(groupFields == null) {
+                       throw new InvalidProgramException("GroupFields may not 
be null.");
+               } else if (groupFields.length == 0) {
+                       throw new InvalidProgramException("GroupFields may not 
be empty.");
+               }
+
+               if(this.splitOrdering != null) {
+                       throw new InvalidProgramException("DataSource may 
either be grouped or sorted.");
+               }
+
+               this.splitGroupKeys = getAllFlatKeys(groupFields);
+
+               return this;
+       }
+
+       /**
+        * Defines that the data within an input split is grouped on the fields 
defined by the field expressions.
+        * Multiple field expressions must be separated by the semicolon ';' 
character.
+        * All records sharing the same key (combination) must be subsequently 
emitted by the input
+        * format for each input split.
+        * <br>
+        * <b>
+        *     IMPORTANT: Providing wrong information with SplitDataProperties 
can cause wrong results!
+        * </b>
+        *
+        * @param groupFields The field expressions of the grouping keys.
+        * @result This SplitDataProperties object.
+        */
+       public SplitDataProperties<T> splitsGroupedBy(String groupFields) {
+
+               if(groupFields == null) {
+                       throw new InvalidProgramException("GroupFields may not 
be null.");
+               }
+
+               String[] groupKeysA = groupFields.split(";");
+               if (groupKeysA.length == 0) {
+                       throw new InvalidProgramException("GroupFields may not 
be empty.");
+               }
+
+               if(this.splitOrdering != null) {
+                       throw new InvalidProgramException("DataSource may 
either be grouped or sorted.");
+               }
+
+               this.splitGroupKeys = getAllFlatKeys(groupKeysA);
+
+               return this;
+       }
+
+       /**
+        * Defines that the data within an input split is sorted on the fields 
defined by the field positions
+        * in the specified orders.
+        * All records of an input split must be emitted by the input format in 
the defined order.
+        * <br>
+        * <b>
+        *     IMPORTANT: Providing wrong information with SplitDataProperties 
can cause wrong results!
+        * </b>
+        *
+        * @param orderFields The field positions of the grouping keys.
+        * @param orders The orders of the fields.
+        * @result This SplitDataProperties object.
+        */
+       public SplitDataProperties<T> splitsOrderedBy(int[] orderFields, 
Order[] orders) {
+
+               if(orderFields == null || orders == null) {
+                       throw new InvalidProgramException("OrderFields or 
Orders may not be null.");
+               } else if (orderFields.length == 0) {
+                       throw new InvalidProgramException("OrderFields may not 
be empty.");
+               } else if (orders.length == 0) {
+                       throw new InvalidProgramException("Orders may not be 
empty");
+               } else if (orderFields.length != orders.length) {
+                       throw new InvalidProgramException("Number of 
OrderFields and Orders must match.");
+               }
+
+               if(this.splitGroupKeys != null) {
+                       throw new InvalidProgramException("DataSource may 
either be grouped or sorted.");
+               }
+
+               this.splitOrdering = new Ordering();
+
+               for(int i=0; i<orderFields.length; i++) {
+                       int pos = orderFields[i];
+                       int[] flatKeys = this.getAllFlatKeys(new int[]{pos});
+
+                       for(int key : flatKeys) {
+                               // check for duplicates
+                               for (int okey : 
splitOrdering.getFieldPositions()) {
+                                       if (key == okey) {
+                                               throw new 
InvalidProgramException("Duplicate field in the field expression " + pos);
+                                       }
+                               }
+                               // append key
+                               this.splitOrdering.appendOrdering(key, null, 
orders[i] );
+                       }
+               }
+               return this;
+       }
+
+       /**
+        * Defines that the data within an input split is sorted on the fields 
defined by the field expressions
+        * in the specified orders. Multiple field expressions must be 
separated by the semicolon ';' character.
+        * All records of an input split must be emitted by the input format in 
the defined order.
+        * <br>
+        * <b>
+        *     IMPORTANT: Providing wrong information with SplitDataProperties 
can cause wrong results!
+        * </b>
+        *
+        * @param orderFields The field expressions of the grouping key.
+        * @param orders The orders of the fields.
+        * @result This SplitDataProperties object.
+        */
+       public SplitDataProperties<T> splitsOrderedBy(String orderFields, 
Order[] orders) {
+
+               if(orderFields == null || orders == null) {
+                       throw new InvalidProgramException("OrderFields or 
Orders may not be null.");
+               }
+
+               String[] orderKeysA = orderFields.split(";");
+               if (orderKeysA.length == 0) {
+                       throw new InvalidProgramException("OrderFields may not 
be empty.");
+               } else if (orders.length == 0) {
+                       throw new InvalidProgramException("Orders may not be 
empty");
+               } else if (orderKeysA.length != orders.length) {
+                       throw new InvalidProgramException("Number of 
OrderFields and Orders must match.");
+               }
+
+               if(this.splitGroupKeys != null) {
+                       throw new InvalidProgramException("DataSource may 
either be grouped or sorted.");
+               }
+
+               this.splitOrdering = new Ordering();
+
+               for(int i=0; i<orderKeysA.length; i++) {
+                       String keyExp = orderKeysA[i];
+                       int[] flatKeys = this.computeFlatKeys(keyExp);
+
+                       for(int key : flatKeys) {
+                               // check for duplicates
+                               for (int okey : 
splitOrdering.getFieldPositions()) {
+                                       if (key == okey) {
+                                               throw new 
InvalidProgramException("Duplicate field in field expression " + keyExp);
+                                       }
+                               }
+                               // append key
+                               this.splitOrdering.appendOrdering(key, null, 
orders[i] );
+                       }
+               }
+               return this;
+       }
+
+       public int[] getSplitPartitionKeys() {
+               return this.splitPartitionKeys;
+       }
+
+       public Partitioner<T> getSplitPartitioner() {
+               return this.splitPartitioner;
+       }
+
+       public int[] getSplitGroupKeys() {
+               return this.splitGroupKeys;
+       }
+
+       public Ordering getSplitOrder() {
+               return this.splitOrdering;
+       }
+
+
+       /////////////////////// FLAT FIELD EXTRACTION METHODS
+
+       private int[] getAllFlatKeys(String[] fieldExpressions) {
+
+               int[] allKeys = null;
+
+               for(String keyExp : fieldExpressions) {
+                       int[] flatKeys = this.computeFlatKeys(keyExp);
+                       if(allKeys == null) {
+                               allKeys = flatKeys;
+                       } else {
+                               // check for duplicates
+                               for(int key1 : flatKeys) {
+                                       for(int key2 : allKeys) {
+                                               if(key1 == key2) {
+                                                       throw new 
InvalidProgramException("Duplicate fields in field expression "+keyExp);
+                                               }
+                                       }
+                               }
+                               // append flat keys
+                               int oldLength = allKeys.length;
+                               int newLength = oldLength + flatKeys.length;
+                               allKeys = Arrays.copyOf(allKeys, newLength);
+                               for(int i=0;i<flatKeys.length; i++) {
+                                       allKeys[oldLength+i] = flatKeys[i];
+                               }
+                       }
+               }
+
+               return allKeys;
+       }
+
+       private int[] getAllFlatKeys(int[] fieldPositions) {
+
+               Keys.ExpressionKeys<T> ek;
+               try {
+                       ek = new Keys.ExpressionKeys<T>(fieldPositions, 
this.type);
+               } catch(IllegalArgumentException iae) {
+                       throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
+               }
+               return ek.computeLogicalKeyPositions();
+       }
+
+
+       private int[] computeFlatKeys(String fieldExpression) {
+
+               fieldExpression = fieldExpression.trim();
+
+               if(this.type instanceof CompositeType) {
+                       // compute flat field positions for (nested) sorting 
fields
+                       Keys.ExpressionKeys<T> ek;
+                       try {
+                               ek = new Keys.ExpressionKeys<T>(new 
String[]{fieldExpression}, this.type);
+                       } catch(IllegalArgumentException iae) {
+                               throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
+                       }
+                       return ek.computeLogicalKeyPositions();
+               } else {
+                       fieldExpression = fieldExpression.trim();
+                       if (!(fieldExpression.equals("*") || 
fieldExpression.equals("_"))) {
+                               throw new InvalidProgramException("Data 
properties on non-composite types can only be defined on the full type. " +
+                                               "Use a field wildcard for that 
(\"*\" or \"_\")");
+                       } else {
+                               return new int[]{0};
+                       }
+               }
+       }
+
+       /**
+        * A custom partitioner to mark compatible split partitionings.
+        *
+        * @param <T> The type of the partitioned data.
+        */
+       public static class SourcePartitionerMarker<T> implements 
Partitioner<T> {
+
+               String partitionMarker;
+
+               public SourcePartitionerMarker(String partitionMarker) {
+                       this.partitionMarker = partitionMarker;
+               }
+
+               @Override
+               public int partition(T key, int numPartitions) {
+                       throw new UnsupportedOperationException("The 
SourcePartitionerMarker is only used as a marker for compatible partitioning. " 
+
+                                       "It must not be invoked.");
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if(o instanceof SourcePartitionerMarker) {
+                               return 
this.partitionMarker.equals(((SourcePartitionerMarker) o).partitionMarker);
+                       } else {
+                               return false;
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index 2352269..d6e511a 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.SplitDataProperties;
 import org.apache.flink.configuration.Configuration;
 
 /**
@@ -41,6 +42,8 @@ public class DataSource<OUT> extends Operator<OUT, 
DataSource<OUT>> {
 
        private Configuration parameters;
 
+       private SplitDataProperties<OUT> splitDataProperties;
+
        // 
--------------------------------------------------------------------------------------------
        
        /**
@@ -90,7 +93,28 @@ public class DataSource<OUT> extends Operator<OUT, 
DataSource<OUT>> {
        public Configuration getParameters() {
                return this.parameters;
        }
-       
+
+
+       /**
+        * Returns the {@link org.apache.flink.api.java.io.SplitDataProperties} 
for the
+        * {@link org.apache.flink.core.io.InputSplit}s of this DataSource
+        * for configurations.
+        *
+        * SplitDataProperties can help to generate more efficient execution 
plans.
+        * <br>
+        * <b>
+        *     IMPORTANT: Incorrect configuration of SplitDataProperties can 
cause wrong results!
+        * </b>
+        *
+        * @return The SplitDataProperties for the InputSplits of this 
DataSource.
+        */
+       public SplitDataProperties<OUT> getSplitDataProperties() {
+               if(this.splitDataProperties == null) {
+                       this.splitDataProperties = new 
SplitDataProperties<OUT>(this);
+               }
+               return this.splitDataProperties;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        
        protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
@@ -106,6 +130,10 @@ public class DataSource<OUT> extends Operator<OUT, 
DataSource<OUT>> {
                if(this.parameters != null) {
                        source.getParameters().addAll(this.parameters);
                }
+               if(this.splitDataProperties != null) {
+                       source.setSplitDataProperties(this.splitDataProperties);
+               }
                return source;
        }
+
 }

Reply via email to