http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
deleted file mode 100644
index 8c19462..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
+++ /dev/null
@@ -1,1436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer;
-
-import static 
org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*;
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.OperatorInformation;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.optimizer.dag.MapNode;
-import org.apache.flink.optimizer.dag.JoinNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import 
org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
-import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.junit.Test;
-import org.apache.flink.api.java.io.TextInputFormat;
-
-
-public class FeedbackPropertiesMatchTest {
-
-       @Test
-       public void testNoPartialSolutionFoundSingleInputOnly() {
-               try {
-                       SourcePlanNode target = new 
SourcePlanNode(getSourceNode(), "Source");
-                       
-                       SourcePlanNode otherTarget = new 
SourcePlanNode(getSourceNode(), "Source");
-                       
-                       Channel toMap1 = new Channel(target);
-                       toMap1.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toMap1.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode map1 = new 
SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-                       
-                       Channel toMap2 = new Channel(map1);
-                       toMap2.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toMap2.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode map2 = new 
SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-                       
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               LocalProperties lp = new LocalProperties();
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(otherTarget, gp, lp);
-                               assertTrue(report == NO_PARTIAL_SOLUTION);
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testSingleInputOperators() {
-               try {
-                       SourcePlanNode target = new 
SourcePlanNode(getSourceNode(), "Source");
-                       
-                       Channel toMap1 = new Channel(target);
-                       toMap1.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toMap1.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode map1 = new 
SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-                       
-                       Channel toMap2 = new Channel(map1);
-                       toMap2.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toMap2.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode map2 = new 
SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-                       
-                       // no feedback properties and none are ever required 
and present
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               LocalProperties lp = new LocalProperties();
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some global feedback properties and none are ever 
required and present
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2, 5));
-                               LocalProperties lp = new LocalProperties();
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some local feedback properties and none are ever 
required and present
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1, 2));
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some global and local feedback properties and none 
are ever required and present
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2, 5));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1, 2));
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // --------------------------- requirements on channel 
1 -----------------------
-                       
-                       // some required global properties, which are matched 
exactly
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2, 5));
-                               LocalProperties lp = new LocalProperties();
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setHashPartitioned(new FieldList(2, 5));
-                               
-                               toMap1.setRequiredGlobalProps(reqGp);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some required local properties, which are matched 
exactly
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1, 2));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(1, 2));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(reqLp);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some required global and local properties, which are 
matched exactly
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2, 5));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1, 2));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setHashPartitioned(new FieldList(2, 5));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(1, 2));
-                               
-                               toMap1.setRequiredGlobalProps(reqGp);
-                               toMap1.setRequiredLocalProps(reqLp);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some required global and local properties, which are 
over-fulfilled
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1, 2));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setHashPartitioned(new FieldSet(2, 5));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(1));
-                               
-                               toMap1.setRequiredGlobalProps(reqGp);
-                               toMap1.setRequiredLocalProps(reqLp);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some required global properties that are not met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2, 1));
-                               LocalProperties lp = new LocalProperties();
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setHashPartitioned(new FieldList(2, 5));
-                               
-                               toMap1.setRequiredGlobalProps(reqGp);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // some required local properties that are not met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(2, 1));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(reqLp);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // some required global and local properties where the 
global properties are not met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2, 1));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldList(2, 5));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(1));
-                               
-                               toMap1.setRequiredGlobalProps(reqGp);
-                               toMap1.setRequiredLocalProps(reqLp);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // some required global and local properties where the 
local properties are not met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldList(1));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(2));
-                               
-                               toMap1.setRequiredGlobalProps(reqGp);
-                               toMap1.setRequiredLocalProps(reqLp);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // --------------------------- requirements on channel 
2 -----------------------
-                       
-                       // some required global properties, which are matched 
exactly
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2, 5));
-                               LocalProperties lp = new LocalProperties();
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setHashPartitioned(new FieldList(2, 5));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(reqGp);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some required local properties, which are matched 
exactly
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1, 2));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(1, 2));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(reqLp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some required global and local properties, which are 
matched exactly
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2, 5));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1, 2));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setHashPartitioned(new FieldList(2, 5));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(1, 2));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(reqGp);
-                               toMap2.setRequiredLocalProps(reqLp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some required global and local properties, which are 
over-fulfilled
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1, 2));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setHashPartitioned(new FieldSet(2, 5));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(1));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(reqGp);
-                               toMap2.setRequiredLocalProps(reqLp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some required global properties that are not met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2, 1));
-                               LocalProperties lp = new LocalProperties();
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setHashPartitioned(new FieldSet(2, 5));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(reqGp);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // some required local properties that are not met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(2, 1));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(reqLp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // some required global and local properties where the 
global properties are not met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(2, 1));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldSet(2, 5));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(1));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(reqGp);
-                               toMap2.setRequiredLocalProps(reqLp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // some required global and local properties where the 
local properties are not met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldList(1));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(2));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(reqGp);
-                               toMap2.setRequiredLocalProps(reqLp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // ---------------------- requirements mixed on 1 and 2 
-----------------------
-                       
-                       // some required global properties at step one and some 
more at step 2
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = LocalProperties.EMPTY;
-                               
-                               RequestedGlobalProperties reqGp1 = new 
RequestedGlobalProperties();
-                               reqGp1.setAnyPartitioning(new FieldList(1, 2));
-                               
-                               RequestedGlobalProperties reqGp2 = new 
RequestedGlobalProperties();
-                               reqGp2.setHashPartitioned(new FieldList(1, 2));
-                               
-                               toMap1.setRequiredGlobalProps(reqGp1);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(reqGp2);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some required local properties at step one and some 
more at step 2
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               LocalProperties lp = 
LocalProperties.forOrdering(new Ordering(3, null, 
Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-                               
-                               RequestedLocalProperties reqLp1 = new 
RequestedLocalProperties();
-                               reqLp1.setGroupedFields(new FieldList(3, 1));
-                               
-                               RequestedLocalProperties reqLp2 = new 
RequestedLocalProperties();
-                               reqLp2.setOrdering(new Ordering(3, null, 
Order.ANY).appendOrdering(1, null, Order.ANY));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(reqLp1);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(reqLp2);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some required global properties at step one and some 
local ones at step 2
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldList(1, 2));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(2));
-                               
-                               toMap1.setRequiredGlobalProps(reqGp);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(reqLp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some required local properties at step one and some 
global ones at step 2
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldList(1, 2));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(2));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(reqLp);
-                               
-                               toMap2.setRequiredGlobalProps(reqGp);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some fulfilled global properties at step one and 
some non-fulfilled local ones at step 2
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldList(1, 2));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(2, 3));
-                               
-                               toMap1.setRequiredGlobalProps(reqGp);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(reqLp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // some fulfilled local properties at step one and some 
non-fulfilled global ones at step 2
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldList(2, 3));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(2, 1));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(reqLp);
-                               
-                               toMap2.setRequiredGlobalProps(reqGp);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // some non-fulfilled global properties at step one and 
some fulfilled local ones at step 2
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldList(2, 3));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(2, 1));
-                               
-                               toMap1.setRequiredGlobalProps(reqGp);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(reqLp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // some non-fulfilled local properties at step one and 
some fulfilled global ones at step 2
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldList(1, 2));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(2, 1, 3));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(reqLp);
-                               
-                               toMap2.setRequiredGlobalProps(reqGp);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testSingleInputOperatorsWithReCreation() {
-               try {
-                       SourcePlanNode target = new 
SourcePlanNode(getSourceNode(), "Source");
-                       
-                       Channel toMap1 = new Channel(target);
-                       SingleInputPlanNode map1 = new 
SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-                       
-                       Channel toMap2 = new Channel(map1);
-                       SingleInputPlanNode map2 = new 
SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-                       
-                       // set ship strategy in first channel, so later non 
matching global properties do not matter
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = LocalProperties.EMPTY;
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldSet(2, 5));
-                               
-                               
toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5), 
DataExchangeMode.PIPELINED);
-                               toMap1.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap2.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(reqGp);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(MET, report);
-                       }
-                       
-                       // set ship strategy in second channel, so previous non 
matching global properties void the match
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = LocalProperties.EMPTY;
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldSet(2, 5));
-                               
-                               
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap1.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               
toMap2.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5), 
DataExchangeMode.PIPELINED);
-                               toMap2.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               
-                               toMap1.setRequiredGlobalProps(reqGp);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // set local strategy in first channel, so later non 
matching local properties do not matter
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = 
LocalProperties.forOrdering(new Ordering(3, null, 
Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(4, 1));
-                               
-                               
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap1.setLocalStrategy(LocalStrategy.SORT, new 
FieldList(5, 7), new boolean[] {false, false});
-                               
-                               
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap2.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(reqLp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // set local strategy in second channel, so previous 
non matching local properties void the match
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = 
LocalProperties.forOrdering(new Ordering(3, null, 
Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(4, 1));
-                               
-                               
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap1.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap2.setLocalStrategy(LocalStrategy.SORT, new 
FieldList(5, 7), new boolean[] {false, false});
-                               
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(reqLp);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // create the properties on the same node as the 
requirement
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(1, 2));
-                               LocalProperties lp = 
LocalProperties.forOrdering(new Ordering(3, null, 
Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldSet(5, 7));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(5, 7));
-                               
-                               
toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(5, 7), 
DataExchangeMode.PIPELINED);
-                               toMap1.setLocalStrategy(LocalStrategy.SORT, new 
FieldList(5, 7), new boolean[] {false, false});
-                               
-                               
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap2.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               toMap1.setRequiredGlobalProps(reqGp);
-                               toMap1.setRequiredLocalProps(reqLp);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(MET, report);
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testSingleInputOperatorsChainOfThree() {
-               try {
-                       SourcePlanNode target = new 
SourcePlanNode(getSourceNode(), "Source");
-                       
-                       Channel toMap1 = new Channel(target);
-                       SingleInputPlanNode map1 = new 
SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-                       
-                       Channel toMap2 = new Channel(map1);
-                       SingleInputPlanNode map2 = new 
SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-                       
-                       Channel toMap3 = new Channel(map2);
-                       SingleInputPlanNode map3 = new 
SingleInputPlanNode(getMapNode(), "Mapper 3", toMap3, DriverStrategy.MAP);
-                       
-                       // set local strategy in first channel, so later non 
matching local properties do not matter
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               LocalProperties lp = 
LocalProperties.forOrdering(new Ordering(3, null, 
Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-                               
-                               RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
-                               reqLp.setGroupedFields(new FieldList(4, 1));
-                               
-                               
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap1.setLocalStrategy(LocalStrategy.SORT, new 
FieldList(5, 7), new boolean[] {false, false});
-                               
-                               
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap2.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               
toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap3.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               toMap3.setRequiredGlobalProps(null);
-                               toMap3.setRequiredLocalProps(reqLp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map3.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // set global strategy in first channel, so later non 
matching global properties do not matter
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(5, 3));
-                               LocalProperties lp = LocalProperties.EMPTY;
-                               
-                               RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldSet(2, 3));
-                               
-                               
toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(1, 2), 
DataExchangeMode.PIPELINED);
-                               toMap1.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap2.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               
toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toMap3.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               toMap2.setRequiredGlobalProps(null);
-                               toMap2.setRequiredLocalProps(null);
-                               
-                               toMap3.setRequiredGlobalProps(reqGp);
-                               toMap3.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= map3.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testNoPartialSolutionFoundTwoInputOperator() {
-               try {
-                       SourcePlanNode target = new 
SourcePlanNode(getSourceNode(), "Partial Solution");
-
-                       SourcePlanNode source1 = new 
SourcePlanNode(getSourceNode(), "Source 1");
-                       SourcePlanNode source2 = new 
SourcePlanNode(getSourceNode(), "Source 2");
-                       
-                       Channel toMap1 = new Channel(source1);
-                       toMap1.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toMap1.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode map1 = new 
SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-                       
-                       Channel toMap2 = new Channel(source2);
-                       toMap2.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toMap2.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode map2 = new 
SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-                       
-                       Channel toJoin1 = new Channel(map1);
-                       Channel toJoin2 = new Channel(map2);
-                       
-                       toJoin1.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toJoin1.setLocalStrategy(LocalStrategy.NONE);
-                       toJoin2.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toJoin2.setLocalStrategy(LocalStrategy.NONE);
-                       
-                       DualInputPlanNode join = new 
DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, 
DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-                       
-                       FeedbackPropertiesMeetRequirementsReport report = 
join.checkPartialSolutionPropertiesMet(target, new GlobalProperties(), new 
LocalProperties());
-                       assertEquals(NO_PARTIAL_SOLUTION, report);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testTwoOperatorsOneIndependent() {
-               try {
-                       SourcePlanNode target = new 
SourcePlanNode(getSourceNode(), "Partial Solution");
-                       SourcePlanNode source = new 
SourcePlanNode(getSourceNode(), "Other Source");
-                       
-                       Channel toMap1 = new Channel(target);
-                       toMap1.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toMap1.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode map1 = new 
SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-                       
-                       Channel toMap2 = new Channel(source);
-                       toMap2.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toMap2.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode map2 = new 
SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-                       
-                       Channel toJoin1 = new Channel(map1);
-                       Channel toJoin2 = new Channel(map2);
-                       
-                       DualInputPlanNode join = new 
DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, 
DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-                       
-                       Channel toAfterJoin = new Channel(join);
-                       toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode afterJoin = new 
SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, 
DriverStrategy.MAP);
-                       
-                       // attach some properties to the non-relevant input
-                       {
-                               
toMap2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
-                               toMap2.setLocalStrategy(LocalStrategy.SORT, new 
FieldList(2, 7), new boolean[] {true, true});
-                               
-                               RequestedGlobalProperties joinGp = new 
RequestedGlobalProperties();
-                               joinGp.setFullyReplicated();
-                               
-                               RequestedLocalProperties joinLp = new 
RequestedLocalProperties();
-                               joinLp.setOrdering(new Ordering(2, null, 
Order.ASCENDING).appendOrdering(7, null, Order.ASCENDING));
-                               
-                               
toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toJoin2.setLocalStrategy(LocalStrategy.NONE);
-                               toJoin2.setRequiredGlobalProps(joinGp);
-                               toJoin2.setRequiredLocalProps(joinLp);
-                       }
-                       
-                       // 
------------------------------------------------------------------------------------
-                       
-                       // no properties from the partial solution, no required 
properties
-                       {
-                               
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toJoin1.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               GlobalProperties gp = new GlobalProperties();
-                               LocalProperties lp = LocalProperties.EMPTY;
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= join.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some properties from the partial solution, no 
required properties
-                       {
-                               
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toJoin1.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= join.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       
-                       // produced properties match relevant input
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(0));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(2));
-                               
-                               toJoin1.setRequiredGlobalProps(rgp);
-                               toJoin1.setRequiredLocalProps(rlp);
-                               
-                               
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toJoin1.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= join.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // produced properties do not match relevant input
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(0));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(1, 2, 3));
-                               
-                               toJoin1.setRequiredGlobalProps(rgp);
-                               toJoin1.setRequiredLocalProps(rlp);
-                               
-                               
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toJoin1.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= join.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // produced properties overridden before join
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(0));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(2, 1));
-                               
-                               toMap1.setRequiredGlobalProps(rgp);
-                               toMap1.setRequiredLocalProps(rlp);
-                               
-                               toJoin1.setRequiredGlobalProps(null);
-                               toJoin1.setRequiredLocalProps(null);
-                               
-                               
toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1), 
DataExchangeMode.PIPELINED);
-                               toJoin1.setLocalStrategy(LocalStrategy.SORT, 
new FieldList(7, 3), new boolean[] {true, false});
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= join.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(MET, report);
-                       }
-                       
-                       // produced properties before join match, after join 
match as well
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(0));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(2, 1));
-                               
-                               toMap1.setRequiredGlobalProps(null);
-                               toMap1.setRequiredLocalProps(null);
-                               
-                               
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toJoin1.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               toJoin1.setRequiredGlobalProps(rgp);
-                               toJoin1.setRequiredLocalProps(rlp);
-                       
-                               
toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                               
toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               toAfterJoin.setRequiredGlobalProps(rgp);
-                               toAfterJoin.setRequiredLocalProps(rlp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= join.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // produced properties before join match, after join do 
not match
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp1 = new 
RequestedGlobalProperties();
-                               rgp1.setHashPartitioned(new FieldList(0));
-                               
-                               RequestedGlobalProperties rgp2 = new 
RequestedGlobalProperties();
-                               rgp2.setHashPartitioned(new FieldList(3));
-                               
-                               RequestedLocalProperties rlp1 = new 
RequestedLocalProperties();
-                               rlp1.setGroupedFields(new FieldList(2, 1));
-                               
-                               RequestedLocalProperties rlp2 = new 
RequestedLocalProperties();
-                               rlp2.setGroupedFields(new FieldList(3, 4));
-                               
-                               toJoin1.setRequiredGlobalProps(rgp1);
-                               toJoin1.setRequiredLocalProps(rlp1);
-                       
-                               
toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                               
toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               toAfterJoin.setRequiredGlobalProps(rgp2);
-                               toAfterJoin.setRequiredLocalProps(rlp2);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // produced properties are overridden, does not matter 
that they do not match
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setAnyPartitioning(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(1));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(1, 2, 3));
-                               
-                               toJoin1.setRequiredGlobalProps(null);
-                               toJoin1.setRequiredLocalProps(null);
-                               
-                               
toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1), 
DataExchangeMode.PIPELINED);
-                               toJoin1.setLocalStrategy(LocalStrategy.SORT, 
new FieldList(7, 3), new boolean[] {true, false});
-                               
-                               toAfterJoin.setRequiredGlobalProps(rgp);
-                               toAfterJoin.setRequiredLocalProps(rlp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(MET, report);
-                       }
-                       
-                       // local property overridden before join, local 
property mismatch after join not relevant
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setAnyPartitioning(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(1, 2, 3));
-                               
-                               toJoin1.setRequiredGlobalProps(null);
-                               toJoin1.setRequiredLocalProps(null);
-                               
-                               
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toJoin1.setLocalStrategy(LocalStrategy.SORT, 
new FieldList(7, 3), new boolean[] {true, false});
-                               
-                               toAfterJoin.setRequiredGlobalProps(null);
-                               toAfterJoin.setRequiredLocalProps(rlp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // local property overridden before join, global 
property mismatch after join void the match
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setAnyPartitioning(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(1));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(1, 2, 3));
-                               
-                               toJoin1.setRequiredGlobalProps(null);
-                               toJoin1.setRequiredLocalProps(null);
-                               
-                               
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toJoin1.setLocalStrategy(LocalStrategy.SORT, 
new FieldList(7, 3), new boolean[] {true, false});
-                               
-                               toAfterJoin.setRequiredGlobalProps(rgp);
-                               toAfterJoin.setRequiredLocalProps(rlp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testTwoOperatorsBothDependent() {
-               try {
-                       SourcePlanNode target = new 
SourcePlanNode(getSourceNode(), "Partial Solution");
-                       
-                       Channel toMap1 = new Channel(target);
-                       toMap1.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toMap1.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode map1 = new 
SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-                       
-                       Channel toMap2 = new Channel(target);
-                       toMap2.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toMap2.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode map2 = new 
SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-                       
-                       Channel toJoin1 = new Channel(map1);
-                       toJoin1.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toJoin1.setLocalStrategy(LocalStrategy.NONE);
-                       
-                       Channel toJoin2 = new Channel(map2);
-                       toJoin2.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toJoin2.setLocalStrategy(LocalStrategy.NONE);
-                       
-                       DualInputPlanNode join = new 
DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, 
DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-                       
-                       Channel toAfterJoin = new Channel(join);
-                       toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
-                       SingleInputPlanNode afterJoin = new 
SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, 
DriverStrategy.MAP);
-                       
-                       // no properties from the partial solution, no required 
properties
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               LocalProperties lp = LocalProperties.EMPTY;
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // some properties from the partial solution, no 
required properties
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // test requirements on one input and met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(0));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(2, 1));
-                               
-                               toJoin1.setRequiredGlobalProps(rgp);
-                               toJoin1.setRequiredLocalProps(rlp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // test requirements on both input and met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(0));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(2, 1));
-                               
-                               toJoin1.setRequiredGlobalProps(rgp);
-                               toJoin1.setRequiredLocalProps(rlp);
-                               
-                               toJoin2.setRequiredGlobalProps(rgp);
-                               toJoin2.setRequiredLocalProps(rlp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertTrue(report != null && report != 
NO_PARTIAL_SOLUTION && report != NOT_MET);
-                       }
-                       
-                       // test requirements on both inputs, one not met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp1 = new 
RequestedGlobalProperties();
-                               rgp1.setHashPartitioned(new FieldList(0));
-                               
-                               RequestedLocalProperties rlp1 = new 
RequestedLocalProperties();
-                               rlp1.setGroupedFields(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp2 = new 
RequestedGlobalProperties();
-                               rgp2.setHashPartitioned(new FieldList(1));
-                               
-                               RequestedLocalProperties rlp2 = new 
RequestedLocalProperties();
-                               rlp2.setGroupedFields(new FieldList(0, 3));
-                               
-                               toJoin1.setRequiredGlobalProps(rgp1);
-                               toJoin1.setRequiredLocalProps(rlp1);
-                               
-                               toJoin2.setRequiredGlobalProps(rgp2);
-                               toJoin2.setRequiredLocalProps(rlp2);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // test override on both inputs, later requirement 
ignored
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(1));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(0, 3));
-                               
-                               toJoin1.setRequiredGlobalProps(null);
-                               toJoin1.setRequiredLocalProps(null);
-                               
-                               toJoin2.setRequiredGlobalProps(null);
-                               toJoin2.setRequiredLocalProps(null);
-                               
-                               
toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), 
DataExchangeMode.PIPELINED);
-                               
toJoin2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
-                               
-                               toAfterJoin.setRequiredGlobalProps(rgp);
-                               toAfterJoin.setRequiredLocalProps(rlp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(MET, report);
-                       }
-                       
-                       // test override on one inputs, later requirement met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(0));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(2, 1));
-                               
-                               
toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), 
DataExchangeMode.PIPELINED);
-                               
toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               
-                               toAfterJoin.setRequiredGlobalProps(rgp);
-                               toAfterJoin.setRequiredLocalProps(rlp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(PENDING, report);
-                       }
-                       
-                       // test override on one input, later requirement not met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(3));
-                               
-                               RequestedLocalProperties rlp = new 
RequestedLocalProperties();
-                               rlp.setGroupedFields(new FieldList(77, 69));
-                               
-                               
toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), 
DataExchangeMode.PIPELINED);
-                               
toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               
-                               toAfterJoin.setRequiredGlobalProps(rgp);
-                               toAfterJoin.setRequiredLocalProps(rlp);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-                       
-                       // test override on one input locally, later global 
requirement not met
-                       {
-                               GlobalProperties gp = new GlobalProperties();
-                               gp.setHashPartitioned(new FieldList(0));
-                               LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(2, 1));
-                               
-                               RequestedGlobalProperties rgp = new 
RequestedGlobalProperties();
-                               rgp.setHashPartitioned(new FieldList(3));
-                               
-                               
-                               
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toJoin1.setLocalStrategy(LocalStrategy.SORT, 
new FieldList(3), new boolean[] { false });
-                               
-                               
toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-                               toJoin1.setLocalStrategy(LocalStrategy.NONE);
-                               
-                               toAfterJoin.setRequiredGlobalProps(rgp);
-                               toAfterJoin.setRequiredLocalProps(null);
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-                               assertEquals(NOT_MET, report);
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private static DataSourceNode getSourceNode() {
-               return new DataSourceNode(new GenericDataSourceBase<String, 
TextInputFormat>(new TextInputFormat(new Path("/")), new 
OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO)));
-       }
-       
-       private static MapNode getMapNode() {
-               return new MapNode(new MapOperatorBase<String, String, 
MapFunction<String,String>>(new IdentityMapper<String>(), new 
UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO), "map op"));
-       }
-       
-       private static JoinNode getJoinNode() {
-               return new JoinNode(new JoinOperatorBase<String, String, 
String, FlatJoinFunction<String, String, String>>(new 
DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, 
String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
deleted file mode 100644
index 77d185d..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * This test case has been created to validate that correct strategies are 
used if orders within groups are
- * requested.
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class GroupOrderTest extends CompilerTestBase {
-
-       @Test
-       public void testReduceWithGroupOrder() {
-               // construct the plan
-               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
-               
-               ReduceOperator reduce = ReduceOperator.builder(new 
IdentityReduce()).keyField(IntValue.class, 
2).name("Reduce").input(source).build();
-               Ordering groupOrder = new Ordering(5, StringValue.class, 
Order.DESCENDING);
-               reduce.setGroupOrder(groupOrder);
-               
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, reduce, "Sink");
-               
-               
-               Plan plan = new Plan(sink, "Test Temp Task");
-               plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-               
-               OptimizedPlan oPlan;
-               try {
-                       oPlan = compileNoStats(plan);
-               } catch(CompilerException ce) {
-                       ce.printStackTrace();
-                       fail("The pact compiler is unable to compile this plan 
correctly.");
-                       return; // silence the compiler
-               }
-               
-               OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
-               SinkPlanNode sinkNode = resolver.getNode("Sink");
-               SingleInputPlanNode reducer = resolver.getNode("Reduce");
-               
-               // verify the strategies
-               Assert.assertEquals(ShipStrategyType.FORWARD, 
sinkNode.getInput().getShipStrategy());
-               Assert.assertEquals(ShipStrategyType.PARTITION_HASH, 
reducer.getInput().getShipStrategy());
-               
-               Channel c = reducer.getInput();
-               Assert.assertEquals(LocalStrategy.SORT, c.getLocalStrategy());
-               
-               FieldList ship = new FieldList(2);
-               FieldList local = new FieldList(2, 5);
-               Assert.assertEquals(ship, c.getShipStrategyKeys());
-               Assert.assertEquals(local, c.getLocalStrategyKeys());
-               Assert.assertTrue(c.getLocalStrategySortOrder()[0] == 
reducer.getSortOrders(0)[0]);
-               
-               // check that we indeed sort descending
-               Assert.assertTrue(c.getLocalStrategySortOrder()[1] == 
groupOrder.getFieldSortDirections()[0]);
-       }
-       
-       @Test
-       public void testCoGroupWithGroupOrder() {
-               // construct the plan
-               FileDataSource source1 = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source1");
-               FileDataSource source2 = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source2");
-               
-               CoGroupOperator coGroup = CoGroupOperator.builder(new 
DummyCoGroupStub(), IntValue.class, 3, 6)
-                               .keyField(LongValue.class, 0, 0)
-                               
.name("CoGroup").input1(source1).input2(source2).build();
-               
-               Ordering groupOrder1 = new Ordering(5, StringValue.class, 
Order.DESCENDING);
-               Ordering groupOrder2 = new Ordering(1, StringValue.class, 
Order.DESCENDING);
-               groupOrder2.appendOrdering(4, DoubleValue.class, 
Order.ASCENDING);
-               coGroup.setGroupOrderForInputOne(groupOrder1);
-               coGroup.setGroupOrderForInputTwo(groupOrder2);
-               
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, coGroup, "Sink");
-               
-               Plan plan = new Plan(sink, "Reduce Group Order Test");
-               plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-               
-               OptimizedPlan oPlan;
-               try {
-                       oPlan = compileNoStats(plan);
-               } catch(CompilerException ce) {
-                       ce.printStackTrace();
-                       fail("The pact compiler is unable to compile this plan 
correctly.");
-                       return; // silence the compiler
-               }
-               
-               OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
-               SinkPlanNode sinkNode = resolver.getNode("Sink");
-               DualInputPlanNode coGroupNode = resolver.getNode("CoGroup");
-               
-               // verify the strategies
-               Assert.assertEquals(ShipStrategyType.FORWARD, 
sinkNode.getInput().getShipStrategy());
-               Assert.assertEquals(ShipStrategyType.PARTITION_HASH, 
coGroupNode.getInput1().getShipStrategy());
-               Assert.assertEquals(ShipStrategyType.PARTITION_HASH, 
coGroupNode.getInput2().getShipStrategy());
-               
-               Channel c1 = coGroupNode.getInput1();
-               Channel c2 = coGroupNode.getInput2();
-               
-               Assert.assertEquals(LocalStrategy.SORT, c1.getLocalStrategy());
-               Assert.assertEquals(LocalStrategy.SORT, c2.getLocalStrategy());
-               
-               FieldList ship1 = new FieldList(new int[] {3, 0});
-               FieldList ship2 = new FieldList(new int[] {6, 0});
-               
-               FieldList local1 = new FieldList(new int[] {3, 0, 5});
-               FieldList local2 = new FieldList(new int[] {6, 0, 1, 4});
-               
-               Assert.assertEquals(ship1, c1.getShipStrategyKeys());
-               Assert.assertEquals(ship2, c2.getShipStrategyKeys());
-               Assert.assertEquals(local1, c1.getLocalStrategyKeys());
-               Assert.assertEquals(local2, c2.getLocalStrategyKeys());
-               
-               Assert.assertTrue(c1.getLocalStrategySortOrder()[0] == 
coGroupNode.getSortOrders()[0]);
-               Assert.assertTrue(c1.getLocalStrategySortOrder()[1] == 
coGroupNode.getSortOrders()[1]);
-               Assert.assertTrue(c2.getLocalStrategySortOrder()[0] == 
coGroupNode.getSortOrders()[0]);
-               Assert.assertTrue(c2.getLocalStrategySortOrder()[1] == 
coGroupNode.getSortOrders()[1]);
-               
-               // check that the local group orderings are correct
-               Assert.assertTrue(c1.getLocalStrategySortOrder()[2] == 
groupOrder1.getFieldSortDirections()[0]);
-               Assert.assertTrue(c2.getLocalStrategySortOrder()[2] == 
groupOrder2.getFieldSortDirections()[0]);
-               Assert.assertTrue(c2.getLocalStrategySortOrder()[3] == 
groupOrder2.getFieldSortDirections()[1]);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
deleted file mode 100644
index 6dadc19..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.types.IntValue;
-import org.junit.Test;
-
-/**
- * This class tests plans that once failed because of a bug:
- * <ul>
- *   <li> Ticket 158
- * </ul>
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class HardPlansCompilationTest extends CompilerTestBase {
-       
-       /**
-        * Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
-        * |--------------------------/                  /
-        * |--------------------------------------------/
-        * 
-        * First cross has SameKeyFirst output contract
-        */
-       @Test
-       public void testTicket158() {
-               // construct the plan
-               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
-               
-               MapOperator map = MapOperator.builder(new 
IdentityMap()).name("Map1").input(source).build();
-               
-               ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce1").input(map).build();
-               
-               CrossOperator cross1 = CrossOperator.builder(new 
DummyCrossStub()).name("Cross1").input1(reduce1).input2(source).build();
-               
-               ReduceOperator reduce2 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce2").input(cross1).build();
-               
-               CrossOperator cross2 = CrossOperator.builder(new 
DummyCrossStub()).name("Cross2").input1(reduce2).input2(source).build();
-               
-               ReduceOperator reduce3 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce3").input(cross2).build();
-               
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, "Sink");
-               sink.setInput(reduce3);
-               
-               Plan plan = new Plan(sink, "Test Temp Task");
-               plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-               
-               OptimizedPlan oPlan = compileNoStats(plan);
-               JobGraphGenerator jobGen = new JobGraphGenerator();
-               jobGen.compileJobGraph(oPlan);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
deleted file mode 100644
index ac4f820..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Collector;
-
-
-@SuppressWarnings({"serial", "unchecked"})
-public class IterationsCompilerTest extends CompilerTestBase {
-
-       @Test
-       public void testSolutionSetDeltaDependsOnBroadcastVariable() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Long, Long>> source =
-                                               env.generateSequence(1, 
1000).map(new DuplicateValueScalar<Long>());
-                       
-                       DataSet<Tuple2<Long, Long>> invariantInput =
-                                       env.generateSequence(1, 1000).map(new 
DuplicateValueScalar<Long>());
-                       
-                       // iteration from here
-                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iter = source.iterateDelta(source, 1000, 1);
-                       
-                       DataSet<Tuple2<Long, Long>> result =
-                               invariantInput
-                                       .map(new IdentityMapper<Tuple2<Long, 
Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
-                                       
.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);
-                       
-                       iter.closeWith(result.map(new 
IdentityMapper<Tuple2<Long,Long>>()), result).print();
-                       
-                       OptimizedPlan p = 
compileNoStats(env.createProgramPlan());
-                       
-                       // check that the JSON generator accepts this plan
-                       new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
-                       
-                       // check that the JobGraphGenerator accepts the plan
-                       new JobGraphGenerator().compileJobGraph(p);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testTwoIterationsWithMapperInbetween() throws Exception {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(8);
-                       
-                       DataSet<Tuple2<Long, Long>> verticesWithInitialId = 
env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-                       
-                       DataSet<Tuple2<Long, Long>> edges = 
env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-                       
-                       DataSet<Tuple2<Long, Long>> bulkResult = 
doBulkIteration(verticesWithInitialId, edges);
-                       
-                       DataSet<Tuple2<Long, Long>> mappedBulk = 
bulkResult.map(new DummyMap());
-                       
-                       DataSet<Tuple2<Long, Long>> depResult = 
doDeltaIteration(mappedBulk, edges);
-                       
-                       depResult.print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       assertEquals(1, op.getDataSinks().size());
-                       
assertTrue(op.getDataSinks().iterator().next().getInput().getSource() 
instanceof WorksetIterationPlanNode);
-                       
-                       WorksetIterationPlanNode wipn = 
(WorksetIterationPlanNode) 
op.getDataSinks().iterator().next().getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.PARTITION_HASH, 
wipn.getInput1().getShipStrategy());
-                       
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-                       
-                       new JobGraphGenerator().compileJobGraph(op);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testTwoIterationsDirectlyChained() throws Exception {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(8);
-                       
-                       DataSet<Tuple2<Long, Long>> verticesWithInitialId = 
env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-                       
-                       DataSet<Tuple2<Long, Long>> edges = 
env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-                       
-                       DataSet<Tuple2<Long, Long>> bulkResult = 
doBulkIteration(verticesWithInitialId, edges);
-                       
-                       DataSet<Tuple2<Long, Long>> depResult = 
doDeltaIteration(bulkResult, edges);
-                       
-                       depResult.print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       assertEquals(1, op.getDataSinks().size());
-                       
assertTrue(op.getDataSinks().iterator().next().getInput().getSource() 
instanceof WorksetIterationPlanNode);
-                       
-                       WorksetIterationPlanNode wipn = 
(WorksetIterationPlanNode) 
op.getDataSinks().iterator().next().getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.PARTITION_HASH, 
wipn.getInput1().getShipStrategy());
-                       
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-                       
-                       new JobGraphGenerator().compileJobGraph(op);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testTwoWorksetIterationsDirectlyChained() throws Exception {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(8);
-                       
-                       DataSet<Tuple2<Long, Long>> verticesWithInitialId = 
env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-                       
-                       DataSet<Tuple2<Long, Long>> edges = 
env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-                       
-                       DataSet<Tuple2<Long, Long>> firstResult = 
doDeltaIteration(verticesWithInitialId, edges);
-                       
-                       DataSet<Tuple2<Long, Long>> secondResult = 
doDeltaIteration(firstResult, edges);
-                       
-                       secondResult.print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       assertEquals(1, op.getDataSinks().size());
-                       
assertTrue(op.getDataSinks().iterator().next().getInput().getSource() 
instanceof WorksetIterationPlanNode);
-                       
-                       WorksetIterationPlanNode wipn = 
(WorksetIterationPlanNode) 
op.getDataSinks().iterator().next().getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
wipn.getInput1().getShipStrategy());
-                       
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-                       
-                       new JobGraphGenerator().compileJobGraph(op);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testIterationPushingWorkOut() throws Exception {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(8);
-                       
-                       DataSet<Tuple2<Long, Long>> input1 = 
env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
-                       
-                       DataSet<Tuple2<Long, Long>> input2 = 
env.readCsvFile("/some/file/path").types(Long.class, Long.class);
-                       
-                       doBulkIteration(input1, input2).print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       assertEquals(1, op.getDataSinks().size());
-                       
assertTrue(op.getDataSinks().iterator().next().getInput().getSource() 
instanceof BulkIterationPlanNode);
-                       
-                       BulkIterationPlanNode bipn = (BulkIterationPlanNode) 
op.getDataSinks().iterator().next().getInput().getSource();
-                       
-                       // check that work has not! been pushed out, as the end 
of the step function does not produce the necessary properties
-                       for (Channel c : 
bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
-                               assertEquals(ShipStrategyType.PARTITION_HASH, 
c.getShipStrategy());
-                       }
-                       
-                       new JobGraphGenerator().compileJobGraph(op);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testWorksetIterationPipelineBreakerPlacement() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(8);
-                       
-                       // the workset (input two of the delta iteration) is 
the same as what is consumed be the successive join
-                       DataSet<Tuple2<Long, Long>> initialWorkset = 
env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
-                       
-                       DataSet<Tuple2<Long, Long>> initialSolutionSet = 
env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
-                       
-                       // trivial iteration, since we are interested in the 
inputs to the iteration
-                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
-                       
-                       DataSet<Tuple2<Long, Long>> next = 
iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
-                       
-                       DataSet<Tuple2<Long, Long>> result = 
iteration.closeWith(next, next);
-                       
-                       initialWorkset
-                               .join(result, JoinHint.REPARTITION_HASH_FIRST)
-                               .where(0).equalTo(0)
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       compileNoStats(p);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testResetPartialSolution() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Long> width = env.generateSequence(1, 10);
-                       DataSet<Long> update = env.generateSequence(1, 10);
-                       DataSet<Long> lastGradient = env.generateSequence(1, 
10);
-                       
-                       DataSet<Long> init = 
width.union(update).union(lastGradient);
-                       
-                       IterativeDataSet<Long> iteration = init.iterate(10);
-                       
-                       width = iteration.filter(new IdFilter<Long>());
-                       update = iteration.filter(new IdFilter<Long>());
-                       lastGradient = iteration.filter(new IdFilter<Long>());
-                       
-                       DataSet<Long> gradient = width.map(new 
IdentityMapper<Long>());
-                       DataSet<Long> term = gradient.join(lastGradient)
-                                                               .where(new 
IdentityKeyExtractor<Long>())
-                                                               .equalTo(new 
IdentityKeyExtractor<Long>())
-                                                               .with(new 
JoinFunction<Long, Long, Long>() {
-                                                                       public 
Long join(Long first, Long second) { return null; }
-                                                               });
-                       
-                       update = update.map(new RichMapFunction<Long, Long>() {
-                               public Long map(Long value) { return null; }
-                       }).withBroadcastSet(term, "some-name");
-                       
-                       DataSet<Long> result = 
iteration.closeWith(width.union(update).union(lastGradient));
-                       
-                       result.print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       new JobGraphGenerator().compileJobGraph(op);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static DataSet<Tuple2<Long, Long>> 
doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, 
Long>> edges) {
-               
-               // open a bulk iteration
-               IterativeDataSet<Tuple2<Long, Long>> iteration = 
vertices.iterate(20);
-               
-               DataSet<Tuple2<Long, Long>> changes = iteration
-                               .join(edges).where(0).equalTo(0).with(new 
Join222())
-                               .groupBy(0).aggregate(Aggregations.MIN, 1)
-                               .join(iteration).where(0).equalTo(0)
-                               .flatMap(new FlatMapJoin());
-               
-               // close the bulk iteration
-               return iteration.closeWith(changes);
-       }
-                               
-               
-       public static DataSet<Tuple2<Long, Long>> 
doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, 
Long>> edges) {
-
-               DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
depIteration = vertices.iterateDelta(vertices, 100, 0);
-                               
-               DataSet<Tuple1<Long>> candidates = 
depIteration.getWorkset().join(edges).where(0).equalTo(0)
-                               .projectSecond(1);
-               
-               DataSet<Tuple1<Long>> grouped = 
candidates.groupBy(0).reduceGroup(new Reduce101());
-               
-               DataSet<Tuple2<Long, Long>> candidatesDependencies = 
-                               
grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1);
-               
-               DataSet<Tuple2<Long, Long>> verticesWithNewComponents = 
-                               
candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
-                               .with(new Join222())
-                               .groupBy(0).aggregate(Aggregations.MIN, 1);
-               
-               DataSet<Tuple2<Long, Long>> updatedComponentId = 
-                               
verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
-                               .flatMap(new FlatMapJoin());
-               
-               DataSet<Tuple2<Long, Long>> depResult = 
depIteration.closeWith(updatedComponentId, updatedComponentId);
-               
-               return depResult;
-               
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static final class Join222 extends RichJoinFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-               @Override
-               public Tuple2<Long, Long> join(Tuple2<Long, Long> 
vertexWithComponent, Tuple2<Long, Long> edge) {
-                       return null;
-               }
-       }
-       
-       public static final class FlatMapJoin extends 
RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, 
Tuple2<Long, Long>> {
-               
-               @Override
-               public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, 
Long>> value, Collector<Tuple2<Long, Long>> out) {}
-       }
-       
-       public static final class DummyMap extends RichMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
-
-               @Override
-               public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws 
Exception {
-                       return value;
-               }
-       }
-       
-       @ForwardedFields("0")
-       public static final class Reduce101 extends 
RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
-               
-               @Override
-               public void reduce(Iterable<Tuple1<Long>> values, 
Collector<Tuple1<Long>> out) {}
-       }
-       
-       @ForwardedFields("0")
-       public static final class DuplicateValue extends 
RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
-
-               @Override
-               public Tuple2<Long, Long> map(Tuple1<Long> value) throws 
Exception {
-                       return new Tuple2<Long, Long>(value.f0, value.f0);
-               }
-       }
-       
-       public static final class DuplicateValueScalar<T> extends 
RichMapFunction<T, Tuple2<T, T>> {
-
-               @Override
-               public Tuple2<T, T> map(T value) {
-                       return new Tuple2<T, T>(value, value);
-               }
-       }
-       
-       public static final class IdFilter<T> implements FilterFunction<T> {
-               @Override
-               public boolean filter(T value) {
-                       return true;
-               }
-       }
-}

Reply via email to