http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java
new file mode 100644
index 0000000..3fabad6
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class CrossStreamOuterSecondDescriptor extends 
CartesianProductDescriptor {
+       
+       public CrossStreamOuterSecondDescriptor() {
+               this(true, true);
+       }
+       
+       public CrossStreamOuterSecondDescriptor(boolean allowBroadcastFirst, 
boolean allowBroadcastSecond) {
+               super(allowBroadcastFirst, allowBroadcastSecond);
+       }
+       
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND;
+       }
+
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
+               // uniqueness becomes grouping with streamed nested loops
+               if ((in2.getGroupedFields() == null || 
in2.getGroupedFields().size() == 0) &&
+                               in2.getUniqueFields() != null && 
in2.getUniqueFields().size() > 0)
+               {
+                       return 
LocalProperties.forGrouping(in2.getUniqueFields().iterator().next().toFieldList());
+               } else {
+                       return in2.clearUniqueFieldSets();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
new file mode 100644
index 0000000..81c823f
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class FilterDescriptor extends OperatorDescriptorSingle {
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.FLAT_MAP;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               return new SingleInputPlanNode(node, "Filter 
("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+               rgp.setAnyDistribution();
+               return Collections.singletonList(rgp);
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               return Collections.singletonList(new 
RequestedLocalProperties());
+       }
+       
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               return gProps;
+       }
+       
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
new file mode 100644
index 0000000..b915e45
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class FlatMapDescriptor extends OperatorDescriptorSingle {
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.FLAT_MAP;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               return new SingleInputPlanNode(node, "FlatMap 
("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+               rgp.setAnyDistribution();
+               return Collections.singletonList(rgp);
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               return Collections.singletonList(new 
RequestedLocalProperties());
+       }
+       
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
+               {
+                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+               }
+               gProps.clearUniqueFieldCombinations();
+               return gProps;
+       }
+       
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps.clearUniqueFieldSets();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
new file mode 100644
index 0000000..b648386
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The properties file belonging to the GroupCombineNode. It translates the 
GroupCombine operation
+ * to the driver strategy SORTED_GROUP_COMBINE and sets the relevant grouping 
and sorting keys.
+ * @see org.apache.flink.optimizer.dag.GroupCombineNode
+ */
+public final class GroupCombineProperties extends OperatorDescriptorSingle {
+
+       private final Ordering ordering;        // ordering that we need to use 
if an additional ordering is requested 
+
+       public GroupCombineProperties(FieldSet groupKeys, Ordering 
additionalOrderKeys) {
+               super(groupKeys);
+
+               // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
+               
+               this.ordering = new Ordering();
+               for (Integer key : this.keyList) {
+                       this.ordering.appendOrdering(key, null, Order.ANY);
+               }
+
+               // and next the additional order fields
+               if (additionalOrderKeys != null) {
+                       for (int i = 0; i < 
additionalOrderKeys.getNumberOfFields(); i++) {
+                               Integer field = 
additionalOrderKeys.getFieldNumber(i);
+                               Order order = additionalOrderKeys.getOrder(i);
+                               this.ordering.appendOrdering(field, 
additionalOrderKeys.getType(i), order);
+                       }
+               }
+
+       }
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.SORTED_GROUP_COMBINE;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               node.setDegreeOfParallelism(in.getSource().getParallelism());
+               
+               // sorting key info
+               SingleInputPlanNode singleInputPlanNode = new 
SingleInputPlanNode(
+                               node, 
+                               "GroupCombine (" + node.getOperator().getName() 
+ ")",
+                               in, // reuse the combine strategy also used in 
the group reduce
+                               DriverStrategy.SORTED_GROUP_COMBINE, 
this.keyList);
+
+               // set sorting comparator key info
+               
singleInputPlanNode.setDriverKeyInfo(this.ordering.getInvolvedIndexes(), 
this.ordering.getFieldSortDirections(), 0);
+               // set grouping comparator key info
+               singleInputPlanNode.setDriverKeyInfo(this.keyList, 1);
+               
+               return singleInputPlanNode;
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               RequestedGlobalProperties props = new 
RequestedGlobalProperties();
+               props.setRandomPartitioning();
+               return Collections.singletonList(props);
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               return Collections.singletonList(new 
RequestedLocalProperties());
+       }
+
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED) {
+                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+               }
+               gProps.clearUniqueFieldCombinations();
+               return gProps;
+       }
+
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps.clearUniqueFieldSets();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
new file mode 100644
index 0000000..ebd09f2
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public final class GroupReduceProperties extends OperatorDescriptorSingle {
+       
+       private final Ordering ordering;                // ordering that we 
need to use if an additional ordering is requested 
+
+       private final Partitioner<?> customPartitioner;
+       
+       
+       public GroupReduceProperties(FieldSet keys) {
+               this(keys, null, null);
+       }
+       
+       public GroupReduceProperties(FieldSet keys, Ordering 
additionalOrderKeys) {
+               this(keys, additionalOrderKeys, null);
+       }
+       
+       public GroupReduceProperties(FieldSet keys, Partitioner<?> 
customPartitioner) {
+               this(keys, null, customPartitioner);
+       }
+       
+       public GroupReduceProperties(FieldSet groupKeys, Ordering 
additionalOrderKeys, Partitioner<?> customPartitioner) {
+               super(groupKeys);
+               
+               // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
+               if (additionalOrderKeys != null) {
+                       this.ordering = new Ordering();
+                       for (Integer key : this.keyList) {
+                               this.ordering.appendOrdering(key, null, 
Order.ANY);
+                       }
+               
+                       // and next the additional order fields
+                       for (int i = 0; i < 
additionalOrderKeys.getNumberOfFields(); i++) {
+                               Integer field = 
additionalOrderKeys.getFieldNumber(i);
+                               Order order = additionalOrderKeys.getOrder(i);
+                               this.ordering.appendOrdering(field, 
additionalOrderKeys.getType(i), order);
+                       }
+               }
+               else {
+                       this.ordering = null;
+               }
+               
+               this.customPartitioner = customPartitioner;
+       }
+       
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.SORTED_GROUP_REDUCE;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               return new SingleInputPlanNode(node, "GroupReduce 
("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, 
this.keyList);
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               RequestedGlobalProperties props = new 
RequestedGlobalProperties();
+               
+               if (customPartitioner == null) {
+                       props.setAnyPartitioning(this.keys);
+               } else {
+                       props.setCustomPartitioned(this.keys, 
this.customPartitioner);
+               }
+               return Collections.singletonList(props);
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               RequestedLocalProperties props = new RequestedLocalProperties();
+               if (this.ordering == null) {
+                       props.setGroupedFields(this.keys);
+               } else {
+                       props.setOrdering(this.ordering);
+               }
+               return Collections.singletonList(props);
+       }
+       
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
+               {
+                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+               }
+               gProps.clearUniqueFieldCombinations();
+               return gProps;
+       }
+       
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps.clearUniqueFieldSets();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
new file mode 100644
index 0000000..c4f47d3
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.GroupReduceNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+public final class GroupReduceWithCombineProperties extends 
OperatorDescriptorSingle {
+       
+       private final Ordering ordering;                // ordering that we 
need to use if an additional ordering is requested 
+       
+       private final Partitioner<?> customPartitioner;
+       
+       
+       public GroupReduceWithCombineProperties(FieldSet groupKeys) {
+               this(groupKeys, null, null);
+       }
+       
+       public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering 
additionalOrderKeys) {
+               this(groupKeys, additionalOrderKeys, null);
+       }
+       
+       public GroupReduceWithCombineProperties(FieldSet groupKeys, 
Partitioner<?> customPartitioner) {
+               this(groupKeys, null, customPartitioner);
+       }
+       
+       public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering 
additionalOrderKeys, Partitioner<?> customPartitioner) {
+               super(groupKeys);
+               
+               // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
+               if (additionalOrderKeys != null) {
+                       this.ordering = new Ordering();
+                       for (Integer key : this.keyList) {
+                               this.ordering.appendOrdering(key, null, 
Order.ANY);
+                       }
+               
+                       // and next the additional order fields
+                       for (int i = 0; i < 
additionalOrderKeys.getNumberOfFields(); i++) {
+                               Integer field = 
additionalOrderKeys.getFieldNumber(i);
+                               Order order = additionalOrderKeys.getOrder(i);
+                               this.ordering.appendOrdering(field, 
additionalOrderKeys.getType(i), order);
+                       }
+               } else {
+                       this.ordering = null;
+               }
+               
+               this.customPartitioner = customPartitioner;
+       }
+       
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.SORTED_GROUP_REDUCE;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
+                       // adjust a sort (changes grouping, so it must be for 
this driver to combining sort
+                       if (in.getLocalStrategy() == LocalStrategy.SORT) {
+                               if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
+                                       throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
+                               }
+                               
in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
+                                                                       
in.getLocalStrategySortOrder());
+                       }
+                       return new SingleInputPlanNode(node, 
"Reduce("+node.getOperator().getName()+")", in,
+                                                                               
        DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+               } else {
+                       // non forward case. all local properties are killed 
anyways, so we can safely plug in a combiner
+                       Channel toCombiner = new Channel(in.getSource());
+                       toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+
+                       // create an input node for combine with same DOP as 
input node
+                       GroupReduceNode combinerNode = ((GroupReduceNode) 
node).getCombinerUtilityNode();
+                       
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+
+                       SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
+                                       .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
+                       combiner.setCosts(new Costs(0, 0));
+                       
combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
+                       // set sorting comparator key info
+                       combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), 
in.getLocalStrategySortOrder(), 0);
+                       // set grouping comparator key info
+                       combiner.setDriverKeyInfo(this.keyList, 1);
+                       
+                       Channel toReducer = new Channel(combiner);
+                       toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
+                                                                       
in.getShipStrategySortOrder(), in.getDataExchangeMode());
+                       toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),
+                                                                               
in.getLocalStrategySortOrder());
+
+                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",
+                                                                               
        toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+               }
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               RequestedGlobalProperties props = new 
RequestedGlobalProperties();
+               if (customPartitioner == null) {
+                       props.setAnyPartitioning(this.keys);
+               } else {
+                       props.setCustomPartitioned(this.keys, 
this.customPartitioner);
+               }
+               return Collections.singletonList(props);
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               RequestedLocalProperties props = new RequestedLocalProperties();
+               if (this.ordering == null) {
+                       props.setGroupedFields(this.keys);
+               } else {
+                       props.setOrdering(this.ordering);
+               }
+               return Collections.singletonList(props);
+       }
+
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
+               {
+                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+               }
+               gProps.clearUniqueFieldCombinations();
+               return gProps;
+       }
+
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps.clearUniqueFieldSets();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
new file mode 100644
index 0000000..fec72a9
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ *
+ */
+public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor {
+       
+       public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2) {
+               super(keys1, keys2);
+       }
+       
+       public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2,
+                       boolean broadcastFirstAllowed, boolean 
broadcastSecondAllowed, boolean repartitionAllowed)
+       {
+               super(keys1, keys2, broadcastFirstAllowed, 
broadcastSecondAllowed, repartitionAllowed);
+       }
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.HYBRIDHASH_BUILD_FIRST;
+       }
+
+       @Override
+       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+               // all properties are possible
+               return Collections.singletonList(new LocalPropertiesPair(new 
RequestedLocalProperties(), new RequestedLocalProperties()));
+       }
+       
+       @Override
+       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
+                       LocalProperties produced1, LocalProperties produced2)
+       {
+               return true;
+       }
+
+       @Override
+       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
+               DriverStrategy strategy;
+               
+               if(!in1.isOnDynamicPath() && in2.isOnDynamicPath()) {
+                       // sanity check that the first input is cached and 
remove that cache
+                       if (!in1.getTempMode().isCached()) {
+                               throw new CompilerException("No cache at point 
where static and dynamic parts meet.");
+                       }
+                       in1.setTempMode(in1.getTempMode().makeNonCached());
+                       strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED;
+               }
+               else {
+                       strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST;
+               }
+               return new DualInputPlanNode(node, 
"Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, 
this.keys2);
+       }
+       
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
+               return new LocalProperties();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
new file mode 100644
index 0000000..f9d1e6c
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public final class HashJoinBuildSecondProperties extends 
AbstractJoinDescriptor {
+       
+       public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2) {
+               super(keys1, keys2);
+       }
+       
+       public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2,
+                       boolean broadcastFirstAllowed, boolean 
broadcastSecondAllowed, boolean repartitionAllowed)
+       {
+               super(keys1, keys2, broadcastFirstAllowed, 
broadcastSecondAllowed, repartitionAllowed);
+       }
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.HYBRIDHASH_BUILD_SECOND;
+       }
+
+       @Override
+       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+               // all properties are possible
+               return Collections.singletonList(new LocalPropertiesPair(
+                       new RequestedLocalProperties(), new 
RequestedLocalProperties()));
+       }
+       
+       @Override
+       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
+                       LocalProperties produced1, LocalProperties produced2)
+       {
+               return true;
+       }
+
+       @Override
+       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
+               DriverStrategy strategy;
+               
+               if (!in2.isOnDynamicPath() && in1.isOnDynamicPath()) {
+                       // sanity check that the first input is cached and 
remove that cache
+                       if (!in2.getTempMode().isCached()) {
+                               throw new CompilerException("No cache at point 
where static and dynamic parts meet.");
+                       }
+                       
+                       in2.setTempMode(in2.getTempMode().makeNonCached());
+                       strategy = 
DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED;
+               }
+               else {
+                       strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND;
+               }
+               return new DualInputPlanNode(node, "Join 
("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, 
this.keys2);
+       }
+       
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
+               return new LocalProperties();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
new file mode 100644
index 0000000..9f14d2a
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class MapDescriptor extends OperatorDescriptorSingle {
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.MAP;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               return new SingleInputPlanNode(node, "Map 
("+node.getOperator().getName()+")", in, DriverStrategy.MAP);
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+               rgp.setAnyDistribution();
+               return Collections.singletonList(rgp);
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               return Collections.singletonList(new 
RequestedLocalProperties());
+       }
+       
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               return gProps;
+       }
+       
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
new file mode 100644
index 0000000..1489097
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class MapPartitionDescriptor extends OperatorDescriptorSingle {
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.MAP_PARTITION;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               return new SingleInputPlanNode(node, "MapPartition 
("+node.getOperator().getName()+")", in, DriverStrategy.MAP_PARTITION);
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+               rgp.setAnyDistribution();
+               return Collections.singletonList(rgp);
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               return Collections.singletonList(new 
RequestedLocalProperties());
+       }
+       
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               return gProps;
+       }
+       
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
new file mode 100644
index 0000000..7ae35c3
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class NoOpDescriptor extends OperatorDescriptorSingle {
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.UNARY_NO_OP;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               return new SingleInputPlanNode(node, "Pipe", in, 
DriverStrategy.UNARY_NO_OP);
+       }
+
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               return Collections.singletonList(new 
RequestedGlobalProperties());
+       }
+
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               return Collections.singletonList(new 
RequestedLocalProperties());
+       }
+       
+
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               return gProps;
+       }
+       
+
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
new file mode 100644
index 0000000..c21593e
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+
+/**
+ * 
+ */
+public abstract class OperatorDescriptorDual implements 
AbstractOperatorDescriptor {
+       
+       protected final FieldList keys1;
+       protected final FieldList keys2;
+       
+       private List<GlobalPropertiesPair> globalProps;
+       private List<LocalPropertiesPair> localProps;
+       
+       protected OperatorDescriptorDual() {
+               this(null, null);
+       }
+       
+       protected OperatorDescriptorDual(FieldList keys1, FieldList keys2) {
+               this.keys1 = keys1;
+               this.keys2 = keys2;
+       }
+       
+       public List<GlobalPropertiesPair> getPossibleGlobalProperties() {
+               if (this.globalProps == null) {
+                       this.globalProps = createPossibleGlobalProperties();
+               }
+               
+               return this.globalProps;
+       }
+       
+       public List<LocalPropertiesPair> getPossibleLocalProperties() {
+               if (this.localProps == null) {
+                       this.localProps = createPossibleLocalProperties();
+               }
+               
+               return this.localProps;
+       }
+       
+       protected abstract List<GlobalPropertiesPair> 
createPossibleGlobalProperties();
+       
+       protected abstract List<LocalPropertiesPair> 
createPossibleLocalProperties();
+       
+       public abstract boolean areCompatible(RequestedGlobalProperties 
requested1, RequestedGlobalProperties requested2,
+                       GlobalProperties produced1, GlobalProperties produced2);
+       
+       public abstract boolean areCoFulfilled(RequestedLocalProperties 
requested1, RequestedLocalProperties requested2,
+                       LocalProperties produced1, LocalProperties produced2);
+       
+       public abstract DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node);
+       
+       public abstract GlobalProperties 
computeGlobalProperties(GlobalProperties in1, GlobalProperties in2);
+       
+       public abstract LocalProperties computeLocalProperties(LocalProperties 
in1, LocalProperties in2);
+
+       protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList 
fields1, FieldList fields2) {
+
+               // check number of produced partitioning fields
+               if(fields1.size() != fields2.size()) {
+                       return false;
+               } else {
+                       return 
checkEquivalentFieldPositionsInKeyFields(fields1, fields2, fields1.size());
+               }
+       }
+
+       protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList 
fields1, FieldList fields2, int numRelevantFields) {
+
+               // check number of produced partitioning fields
+               if(fields1.size() < numRelevantFields || fields2.size() < 
numRelevantFields) {
+                       return false;
+               }
+               else {
+                       for(int i=0; i<numRelevantFields; i++) {
+                               int pField1 = fields1.get(i);
+                               int pField2 = fields2.get(i);
+                               // check if position of both produced fields is 
the same in both requested fields
+                               int j;
+                               for(j=0; j<this.keys1.size(); j++) {
+                                       if(this.keys1.get(j) == pField1 && 
this.keys2.get(j) == pField2) {
+                                               break;
+                                       }
+                                       else if(this.keys1.get(j) != pField1 && 
this.keys2.get(j) != pField2) {
+                                               // do nothing
+                                       }
+                                       else {
+                                               return false;
+                                       }
+                               }
+                               if(j == this.keys1.size()) {
+                                       throw new CompilerException("Fields 
were not found in key fields.");
+                               }
+                       }
+               }
+               return true;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static final class GlobalPropertiesPair {
+               
+               private final RequestedGlobalProperties props1, props2;
+
+               public GlobalPropertiesPair(RequestedGlobalProperties props1, 
RequestedGlobalProperties props2) {
+                       this.props1 = props1;
+                       this.props2 = props2;
+               }
+               
+               public RequestedGlobalProperties getProperties1() {
+                       return this.props1;
+               }
+               
+               public RequestedGlobalProperties getProperties2() {
+                       return this.props2;
+               }
+               
+               @Override
+               public int hashCode() {
+                       return (this.props1 == null ? 0 : 
this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode());
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj.getClass() == GlobalPropertiesPair.class) {
+                               final GlobalPropertiesPair other = 
(GlobalPropertiesPair) obj;
+                               
+                               return (this.props1 == null ? other.props1 == 
null : this.props1.equals(other.props1)) &&
+                                               (this.props2 == null ? 
other.props2 == null : this.props2.equals(other.props2));
+                       }
+                       return false;
+               }
+               
+               @Override
+               public String toString() {
+                       return "{" + this.props1 + " / " + this.props2 + "}";
+               }
+       }
+       
+       public static final class LocalPropertiesPair {
+               
+               private final RequestedLocalProperties props1, props2;
+
+               public LocalPropertiesPair(RequestedLocalProperties props1, 
RequestedLocalProperties props2) {
+                       this.props1 = props1;
+                       this.props2 = props2;
+               }
+               
+               public RequestedLocalProperties getProperties1() {
+                       return this.props1;
+               }
+               
+               public RequestedLocalProperties getProperties2() {
+                       return this.props2;
+               }
+               
+               @Override
+               public int hashCode() {
+                       return (this.props1 == null ? 0 : 
this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode());
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj.getClass() == LocalPropertiesPair.class) {
+                               final LocalPropertiesPair other = 
(LocalPropertiesPair) obj;
+                               
+                               return (this.props1 == null ? other.props1 == 
null : this.props1.equals(other.props1)) &&
+                                               (this.props2 == null ? 
other.props2 == null : this.props2.equals(other.props2));
+                       }
+                       return false;
+               }
+
+               @Override
+               public String toString() {
+                       return "{" + this.props1 + " / " + this.props2 + "}";
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
new file mode 100644
index 0000000..c8be5d4
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+
+/**
+ * Abstract base class for Operator descriptions which instantiates the node 
and sets the driver
+ * strategy and the sorting and grouping keys. Returns possible local and 
global properties and
+ * updates them after the operation has been performed.
+ * @see org.apache.flink.compiler.dag.SingleInputNode
+ */
+public abstract class OperatorDescriptorSingle implements 
AbstractOperatorDescriptor {
+       
+       protected final FieldSet keys;                  // the set of key fields
+       protected final FieldList keyList;              // the key fields with 
ordered field positions
+
+       private List<RequestedGlobalProperties> globalProps;
+       private List<RequestedLocalProperties> localProps;
+       
+       
+       protected OperatorDescriptorSingle() {
+               this(null);
+       }
+       
+       protected OperatorDescriptorSingle(FieldSet keys) {
+               this.keys = keys;
+               this.keyList = keys == null ? null : keys.toFieldList();
+       }
+
+
+       public List<RequestedGlobalProperties> getPossibleGlobalProperties() {
+               if (this.globalProps == null) {
+                       this.globalProps = createPossibleGlobalProperties();
+               }
+               return this.globalProps;
+       }
+       
+       public List<RequestedLocalProperties> getPossibleLocalProperties() {
+               if (this.localProps == null) {
+                       this.localProps = createPossibleLocalProperties();
+               }
+               return this.localProps;
+       }
+
+       /**
+        * Returns a list of global properties that are required by this 
operator descriptor.
+        * 
+        * @return A list of global properties that are required by this 
operator descriptor.
+        */
+       protected abstract List<RequestedGlobalProperties> 
createPossibleGlobalProperties();
+       
+       /**
+        * Returns a list of local properties that are required by this 
operator descriptor.
+        * 
+        * @return A list of local properties that are required by this 
operator descriptor.
+        */
+       protected abstract List<RequestedLocalProperties> 
createPossibleLocalProperties();
+       
+       public abstract SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node);
+       
+       /**
+        * Returns the global properties which are present after the operator 
was applied on the 
+        * provided global properties.
+        * 
+        * @param in The global properties on which the operator is applied.
+        * @return The global properties which are valid after the operator has 
been applied.
+        */
+       public abstract GlobalProperties 
computeGlobalProperties(GlobalProperties in);
+       
+       /**
+        * Returns the local properties which are present after the operator 
was applied on the 
+        * provided local properties.
+        * 
+        * @param in The local properties on which the operator is applied.
+        * @return The local properties which are valid after the operator has 
been applied.
+        */
+       public abstract LocalProperties computeLocalProperties(LocalProperties 
in);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
new file mode 100644
index 0000000..2bde29b
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.dag.GroupReduceNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public final class PartialGroupProperties extends OperatorDescriptorSingle {
+       
+       public PartialGroupProperties(FieldSet keys) {
+               super(keys);
+       }
+       
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.SORTED_GROUP_COMBINE;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               // create in input node for combine with same DOP as input node
+               GroupReduceNode combinerNode = new 
GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator());
+               
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+
+               SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", 
in,
+                               DriverStrategy.SORTED_GROUP_COMBINE);
+               // sorting key info
+               combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), 
in.getLocalStrategySortOrder(), 0);
+               // set grouping comparator key info
+               combiner.setDriverKeyInfo(this.keyList, 1);
+               
+               return combiner;
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               return Collections.singletonList(new 
RequestedGlobalProperties());
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               RequestedLocalProperties props = new RequestedLocalProperties();
+               props.setGroupedFields(this.keys);
+               return Collections.singletonList(props);
+       }
+       
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
+               {
+                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+               }
+               gProps.clearUniqueFieldCombinations();
+               return gProps;
+       }
+       
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps.clearUniqueFieldSets();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
new file mode 100644
index 0000000..5bb51f3
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.ReduceNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+public final class ReduceProperties extends OperatorDescriptorSingle {
+       
+       private final Partitioner<?> customPartitioner;
+       
+       public ReduceProperties(FieldSet keys) {
+               this(keys, null);
+       }
+       
+       public ReduceProperties(FieldSet keys, Partitioner<?> 
customPartitioner) {
+               super(keys);
+               this.customPartitioner = customPartitioner;
+       }
+       
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.SORTED_REDUCE;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
+                               (node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty()))
+               {
+                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")", in,
+                                                                               
        DriverStrategy.SORTED_REDUCE, this.keyList);
+               }
+               else {
+                       // non forward case. all local properties are killed 
anyways, so we can safely plug in a combiner
+                       Channel toCombiner = new Channel(in.getSource());
+                       toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
+                       
+                       // create an input node for combine with same DOP as 
input node
+                       ReduceNode combinerNode = ((ReduceNode) 
node).getCombinerUtilityNode();
+                       
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+
+                       SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode,
+                                                               "Combine 
("+node.getOperator().getName()+")", toCombiner,
+                                                               
DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
+
+                       combiner.setCosts(new Costs(0, 0));
+                       
combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
+                       
+                       Channel toReducer = new Channel(combiner);
+                       toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
+                                                                               
in.getShipStrategySortOrder(), in.getDataExchangeMode());
+                       toReducer.setLocalStrategy(LocalStrategy.SORT, 
in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
+
+                       return new SingleInputPlanNode(node, 
"Reduce("+node.getOperator().getName()+")", toReducer,
+                                                                               
        DriverStrategy.SORTED_REDUCE, this.keyList);
+               }
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               RequestedGlobalProperties props = new 
RequestedGlobalProperties();
+               if (customPartitioner == null) {
+                       props.setAnyPartitioning(this.keys);
+               } else {
+                       props.setCustomPartitioned(this.keys, 
this.customPartitioner);
+               }
+               return Collections.singletonList(props);
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               RequestedLocalProperties props = new RequestedLocalProperties();
+               props.setGroupedFields(this.keys);
+               return Collections.singletonList(props);
+       }
+
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
+               {
+                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+               }
+               gProps.clearUniqueFieldCombinations();
+               return gProps;
+       }
+
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps.clearUniqueFieldSets();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
new file mode 100644
index 0000000..1dcd87d
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ *
+ */
+public class SolutionSetDeltaOperator extends OperatorDescriptorSingle {
+
+       public SolutionSetDeltaOperator(FieldList partitioningFields) {
+               super(partitioningFields);
+       }
+       
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.UNARY_NO_OP;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               return new SingleInputPlanNode(node, "SolutionSet Delta", in, 
DriverStrategy.UNARY_NO_OP);
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               RequestedGlobalProperties partProps = new 
RequestedGlobalProperties();
+               partProps.setHashPartitioned(this.keyList);
+               return Collections.singletonList(partProps);
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               return Collections.singletonList(new 
RequestedLocalProperties());
+       }
+       
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               return gProps;
+       }
+       
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
new file mode 100644
index 0000000..356836a
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ * 
+ */
+public class SortMergeJoinDescriptor extends AbstractJoinDescriptor {
+       
+       public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2) {
+               super(keys1, keys2);
+       }
+       
+       public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2,
+                       boolean broadcastFirstAllowed, boolean 
broadcastSecondAllowed, boolean repartitionAllowed)
+       {
+               super(keys1, keys2, broadcastFirstAllowed, 
broadcastSecondAllowed, repartitionAllowed);
+       }
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.MERGE;
+       }
+
+       @Override
+       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+               RequestedLocalProperties sort1 = new 
RequestedLocalProperties(Utils.createOrdering(this.keys1));
+               RequestedLocalProperties sort2 = new 
RequestedLocalProperties(Utils.createOrdering(this.keys2));
+               return Collections.singletonList(new LocalPropertiesPair(sort1, 
sort2));
+       }
+
+       @Override
+       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
+                       LocalProperties produced1, LocalProperties produced2)
+       {
+               int numRelevantFields = this.keys1.size();
+               
+               Ordering prod1 = produced1.getOrdering();
+               Ordering prod2 = produced2.getOrdering();
+               
+               if (prod1 == null || prod2 == null) {
+                       throw new CompilerException("The given properties do 
not meet this operators requirements.");
+               }
+
+               // check that order of fields is equivalent
+               if (!checkEquivalentFieldPositionsInKeyFields(
+                               prod1.getInvolvedIndexes(), 
prod2.getInvolvedIndexes(), numRelevantFields)) {
+                       return false;
+               }
+
+               // check that both inputs have the same directions of order
+               for (int i = 0; i < numRelevantFields; i++) {
+                       if (prod1.getOrder(i) != prod2.getOrder(i)) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+       
+       @Override
+       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
+               boolean[] inputOrders = 
in1.getLocalProperties().getOrdering().getFieldSortDirections();
+               
+               if (inputOrders == null || inputOrders.length < 
this.keys1.size()) {
+                       throw new CompilerException("BUG: The input strategy 
does not sufficiently describe the sort orders for a merge operator.");
+               } else if (inputOrders.length > this.keys1.size()) {
+                       boolean[] tmp = new boolean[this.keys1.size()];
+                       System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
+                       inputOrders = tmp;
+               }
+               
+               return new DualInputPlanNode(node, 
"Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, 
this.keys1, this.keys2, inputOrders);
+       }
+
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
+               LocalProperties comb = LocalProperties.combine(in1, in2);
+               return comb.clearUniqueFieldSets();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
new file mode 100644
index 0000000..c42cff2
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.SinkJoiner;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ *
+ */
+public class UtilSinkJoinOpDescriptor extends OperatorDescriptorDual {
+       
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.BINARY_NO_OP;
+       }
+       
+       @Override
+       protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
+               // all properties are possible
+               return Collections.singletonList(new GlobalPropertiesPair(
+                       new RequestedGlobalProperties(), new 
RequestedGlobalProperties()));
+       }
+
+       @Override
+       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+               // all properties are possible
+               return Collections.singletonList(new LocalPropertiesPair(
+                       new RequestedLocalProperties(), new 
RequestedLocalProperties()));
+       }
+       
+       @Override
+       public boolean areCompatible(RequestedGlobalProperties requested1, 
RequestedGlobalProperties requested2,
+                       GlobalProperties produced1, GlobalProperties produced2) 
{
+               return true;
+       }
+       
+       @Override
+       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
+                       LocalProperties produced1, LocalProperties produced2) {
+               return true;
+       }
+
+       @Override
+       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
+               if (node instanceof SinkJoiner) {
+                       return new SinkJoinerPlanNode((SinkJoiner) node, in1, 
in2);
+               } else {
+                       throw new CompilerException();
+               }
+       }
+
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
+               return new LocalProperties();
+       }
+
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties in1, 
GlobalProperties in2) {
+               return GlobalProperties.combine(in1, in2);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
new file mode 100644
index 0000000..bf22fb3
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ * A special subclass for the union to make it identifiable.
+ */
+public class BinaryUnionPlanNode extends DualInputPlanNode {
+       
+       /**
+        * @param template
+        */
+       public BinaryUnionPlanNode(BinaryUnionNode template, Channel in1, 
Channel in2) {
+               super(template, "Union", in1, in2, DriverStrategy.UNION);
+       }
+       
+       public BinaryUnionPlanNode(BinaryUnionPlanNode toSwapFrom) {
+               super(toSwapFrom.getOptimizerNode(), "Union-With-Cached", 
toSwapFrom.getInput2(), toSwapFrom.getInput1(),
+                               DriverStrategy.UNION_WITH_CACHED);
+               
+               this.globalProps = toSwapFrom.globalProps;
+               this.localProps = toSwapFrom.localProps;
+               this.nodeCosts = toSwapFrom.nodeCosts;
+               this.cumulativeCosts = toSwapFrom.cumulativeCosts;
+               
+               setParallelism(toSwapFrom.getParallelism());
+       }
+       
+       public BinaryUnionNode getOptimizerNode() {
+               return (BinaryUnionNode) this.template;
+       }
+       
+       public boolean unionsStaticAndDynamicPath() {
+               return getInput1().isOnDynamicPath() != 
getInput2().isOnDynamicPath();
+       }
+       
+       @Override
+       public int getMemoryConsumerWeight() {
+               return 0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
new file mode 100644
index 0000000..e79e2f3
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+
+import java.util.HashMap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.BulkIterationNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+public class BulkIterationPlanNode extends SingleInputPlanNode implements 
IterationPlanNode {
+       
+       private final BulkPartialSolutionPlanNode partialSolutionPlanNode;
+       
+       private final PlanNode rootOfStepFunction;
+       
+       private PlanNode rootOfTerminationCriterion;
+       
+       private TypeSerializerFactory<?> serializerForIterationChannel;
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       public BulkIterationPlanNode(BulkIterationNode template, String 
nodeName, Channel input,
+                       BulkPartialSolutionPlanNode pspn, PlanNode 
rootOfStepFunction)
+       {
+               super(template, nodeName, input, DriverStrategy.NONE);
+               this.partialSolutionPlanNode = pspn;
+               this.rootOfStepFunction = rootOfStepFunction;
+
+               mergeBranchPlanMaps();
+       }
+       
+       public BulkIterationPlanNode(BulkIterationNode template, String 
nodeName, Channel input,
+                       BulkPartialSolutionPlanNode pspn, PlanNode 
rootOfStepFunction, PlanNode rootOfTerminationCriterion)
+       {
+               this(template, nodeName, input, pspn, rootOfStepFunction);
+               this.rootOfTerminationCriterion = rootOfTerminationCriterion;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public BulkIterationNode getIterationNode() {
+               if (this.template instanceof BulkIterationNode) {
+                       return (BulkIterationNode) this.template;
+               } else {
+                       throw new RuntimeException();
+               }
+       }
+       
+       public BulkPartialSolutionPlanNode getPartialSolutionPlanNode() {
+               return this.partialSolutionPlanNode;
+       }
+       
+       public PlanNode getRootOfStepFunction() {
+               return this.rootOfStepFunction;
+       }
+       
+       public PlanNode getRootOfTerminationCriterion() {
+               return this.rootOfTerminationCriterion;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       
+       public TypeSerializerFactory<?> getSerializerForIterationChannel() {
+               return serializerForIterationChannel;
+       }
+       
+       public void setSerializerForIterationChannel(TypeSerializerFactory<?> 
serializerForIterationChannel) {
+               this.serializerForIterationChannel = 
serializerForIterationChannel;
+       }
+
+       public void setCosts(Costs nodeCosts) {
+               // add the costs from the step function
+               
nodeCosts.addCosts(this.rootOfStepFunction.getCumulativeCosts());
+               
+               // add the costs for the termination criterion, if it exists
+               // the costs are divided at branches, so we can simply add them 
up
+               if (rootOfTerminationCriterion != null) {
+                       
nodeCosts.addCosts(this.rootOfTerminationCriterion.getCumulativeCosts());
+               }
+               
+               super.setCosts(nodeCosts);
+       }
+       
+       public int getMemoryConsumerWeight() {
+               return 1;
+       }
+       
+
+       @Override
+       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+               if (source == this) {
+                       return FOUND_SOURCE;
+               }
+               
+               SourceAndDamReport fromOutside = 
super.hasDamOnPathDownTo(source);
+
+               if (fromOutside == FOUND_SOURCE_AND_DAM) {
+                       return FOUND_SOURCE_AND_DAM;
+               }
+               else if (fromOutside == FOUND_SOURCE) {
+                       // we always have a dam in the back channel
+                       return FOUND_SOURCE_AND_DAM;
+               } else {
+                       // check the step function for dams
+                       return 
this.rootOfStepFunction.hasDamOnPathDownTo(source);
+               }
+       }
+
+       @Override
+       public void acceptForStepFunction(Visitor<PlanNode> visitor) {
+               this.rootOfStepFunction.accept(visitor);
+               
+               if(this.rootOfTerminationCriterion != null) {
+                       this.rootOfTerminationCriterion.accept(visitor);
+               }
+       }
+
+       private void mergeBranchPlanMaps() {
+               for (OptimizerNode.UnclosedBranchDescriptor desc: 
template.getOpenBranches()) {
+                       OptimizerNode brancher = desc.getBranchingNode();
+
+                       if (branchPlan == null) {
+                               branchPlan = new HashMap<OptimizerNode, 
PlanNode>(6);
+                       }
+                       
+                       if (!branchPlan.containsKey(brancher)) {
+                               PlanNode selectedCandidate = null;
+
+                               if (rootOfStepFunction.branchPlan != null) {
+                                       selectedCandidate = 
rootOfStepFunction.branchPlan.get(brancher);
+                               }
+
+                               if (selectedCandidate == null) {
+                                       throw new CompilerException(
+                                                       "Candidates for a node 
with open branches are missing information about the selected candidate ");
+                               }
+
+                               this.branchPlan.put(brancher, 
selectedCandidate);
+                       }
+               }
+       }
+}

Reply via email to