http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java deleted file mode 100644 index 8c19462..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java +++ /dev/null @@ -1,1436 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer; - -import static org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*; -import static org.junit.Assert.*; - -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.operators.BinaryOperatorInformation; -import org.apache.flink.api.common.operators.GenericDataSourceBase; -import org.apache.flink.api.common.operators.OperatorInformation; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.UnaryOperatorInformation; -import org.apache.flink.api.common.operators.base.JoinOperatorBase; -import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.optimizer.dag.DataSourceNode; -import org.apache.flink.optimizer.dag.MapNode; -import org.apache.flink.optimizer.dag.JoinNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SourcePlanNode; -import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport; -import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; -import org.apache.flink.optimizer.testfunctions.IdentityMapper; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.io.network.DataExchangeMode; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.junit.Test; -import org.apache.flink.api.java.io.TextInputFormat; - - -public class FeedbackPropertiesMatchTest { - - @Test - public void testNoPartialSolutionFoundSingleInputOnly() { - try { - SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source"); - - SourcePlanNode otherTarget = new SourcePlanNode(getSourceNode(), "Source"); - - Channel toMap1 = new Channel(target); - toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP); - - Channel toMap2 = new Channel(map1); - toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP); - - { - GlobalProperties gp = new GlobalProperties(); - LocalProperties lp = new LocalProperties(); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(otherTarget, gp, lp); - assertTrue(report == NO_PARTIAL_SOLUTION); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSingleInputOperators() { - try { - SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source"); - - Channel toMap1 = new Channel(target); - toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP); - - Channel toMap2 = new Channel(map1); - toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP); - - // no feedback properties and none are ever required and present - { - GlobalProperties gp = new GlobalProperties(); - LocalProperties lp = new LocalProperties(); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some global feedback properties and none are ever required and present - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2, 5)); - LocalProperties lp = new LocalProperties(); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some local feedback properties and none are ever required and present - { - GlobalProperties gp = new GlobalProperties(); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2)); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some global and local feedback properties and none are ever required and present - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2, 5)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2)); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // --------------------------- requirements on channel 1 ----------------------- - - // some required global properties, which are matched exactly - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2, 5)); - LocalProperties lp = new LocalProperties(); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setHashPartitioned(new FieldList(2, 5)); - - toMap1.setRequiredGlobalProps(reqGp); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some required local properties, which are matched exactly - { - GlobalProperties gp = new GlobalProperties(); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(1, 2)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(reqLp); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some required global and local properties, which are matched exactly - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2, 5)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setHashPartitioned(new FieldList(2, 5)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(1, 2)); - - toMap1.setRequiredGlobalProps(reqGp); - toMap1.setRequiredLocalProps(reqLp); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some required global and local properties, which are over-fulfilled - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setHashPartitioned(new FieldSet(2, 5)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(1)); - - toMap1.setRequiredGlobalProps(reqGp); - toMap1.setRequiredLocalProps(reqLp); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some required global properties that are not met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2, 1)); - LocalProperties lp = new LocalProperties(); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setHashPartitioned(new FieldList(2, 5)); - - toMap1.setRequiredGlobalProps(reqGp); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // some required local properties that are not met - { - GlobalProperties gp = new GlobalProperties(); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(2, 1)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(reqLp); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // some required global and local properties where the global properties are not met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2, 1)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldList(2, 5)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(1)); - - toMap1.setRequiredGlobalProps(reqGp); - toMap1.setRequiredLocalProps(reqLp); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // some required global and local properties where the local properties are not met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldList(1)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(2)); - - toMap1.setRequiredGlobalProps(reqGp); - toMap1.setRequiredLocalProps(reqLp); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // --------------------------- requirements on channel 2 ----------------------- - - // some required global properties, which are matched exactly - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2, 5)); - LocalProperties lp = new LocalProperties(); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setHashPartitioned(new FieldList(2, 5)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(reqGp); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some required local properties, which are matched exactly - { - GlobalProperties gp = new GlobalProperties(); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(1, 2)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(reqLp); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some required global and local properties, which are matched exactly - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2, 5)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setHashPartitioned(new FieldList(2, 5)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(1, 2)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(reqGp); - toMap2.setRequiredLocalProps(reqLp); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some required global and local properties, which are over-fulfilled - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setHashPartitioned(new FieldSet(2, 5)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(1)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(reqGp); - toMap2.setRequiredLocalProps(reqLp); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some required global properties that are not met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2, 1)); - LocalProperties lp = new LocalProperties(); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setHashPartitioned(new FieldSet(2, 5)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(reqGp); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // some required local properties that are not met - { - GlobalProperties gp = new GlobalProperties(); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(2, 1)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(reqLp); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // some required global and local properties where the global properties are not met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(2, 1)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldSet(2, 5)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(1)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(reqGp); - toMap2.setRequiredLocalProps(reqLp); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // some required global and local properties where the local properties are not met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(1)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldList(1)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(2)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(reqGp); - toMap2.setRequiredLocalProps(reqLp); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // ---------------------- requirements mixed on 1 and 2 ----------------------- - - // some required global properties at step one and some more at step 2 - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.EMPTY; - - RequestedGlobalProperties reqGp1 = new RequestedGlobalProperties(); - reqGp1.setAnyPartitioning(new FieldList(1, 2)); - - RequestedGlobalProperties reqGp2 = new RequestedGlobalProperties(); - reqGp2.setHashPartitioned(new FieldList(1, 2)); - - toMap1.setRequiredGlobalProps(reqGp1); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(reqGp2); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some required local properties at step one and some more at step 2 - { - GlobalProperties gp = new GlobalProperties(); - LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING)); - - RequestedLocalProperties reqLp1 = new RequestedLocalProperties(); - reqLp1.setGroupedFields(new FieldList(3, 1)); - - RequestedLocalProperties reqLp2 = new RequestedLocalProperties(); - reqLp2.setOrdering(new Ordering(3, null, Order.ANY).appendOrdering(1, null, Order.ANY)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(reqLp1); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(reqLp2); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some required global properties at step one and some local ones at step 2 - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldList(1, 2)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(2)); - - toMap1.setRequiredGlobalProps(reqGp); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(reqLp); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some required local properties at step one and some global ones at step 2 - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldList(1, 2)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(2)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(reqLp); - - toMap2.setRequiredGlobalProps(reqGp); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some fulfilled global properties at step one and some non-fulfilled local ones at step 2 - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldList(1, 2)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(2, 3)); - - toMap1.setRequiredGlobalProps(reqGp); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(reqLp); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // some fulfilled local properties at step one and some non-fulfilled global ones at step 2 - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldList(2, 3)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(2, 1)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(reqLp); - - toMap2.setRequiredGlobalProps(reqGp); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // some non-fulfilled global properties at step one and some fulfilled local ones at step 2 - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldList(2, 3)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(2, 1)); - - toMap1.setRequiredGlobalProps(reqGp); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(reqLp); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // some non-fulfilled local properties at step one and some fulfilled global ones at step 2 - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldList(1, 2)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(2, 1, 3)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(reqLp); - - toMap2.setRequiredGlobalProps(reqGp); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSingleInputOperatorsWithReCreation() { - try { - SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source"); - - Channel toMap1 = new Channel(target); - SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP); - - Channel toMap2 = new Channel(map1); - SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP); - - // set ship strategy in first channel, so later non matching global properties do not matter - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.EMPTY; - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldSet(2, 5)); - - toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5), DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.NONE); - - toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.NONE); - - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(reqGp); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(MET, report); - } - - // set ship strategy in second channel, so previous non matching global properties void the match - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.EMPTY; - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldSet(2, 5)); - - toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.NONE); - - toMap2.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5), DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.NONE); - - - toMap1.setRequiredGlobalProps(reqGp); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // set local strategy in first channel, so later non matching local properties do not matter - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(4, 1)); - - toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false}); - - toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.NONE); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(reqLp); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // set local strategy in second channel, so previous non matching local properties void the match - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(4, 1)); - - toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.NONE); - - toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false}); - - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(reqLp); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // create the properties on the same node as the requirement - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(1, 2)); - LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING)); - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldSet(5, 7)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(5, 7)); - - toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(5, 7), DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false}); - - toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.NONE); - - toMap1.setRequiredGlobalProps(reqGp); - toMap1.setRequiredLocalProps(reqLp); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(MET, report); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSingleInputOperatorsChainOfThree() { - try { - SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source"); - - Channel toMap1 = new Channel(target); - SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP); - - Channel toMap2 = new Channel(map1); - SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP); - - Channel toMap3 = new Channel(map2); - SingleInputPlanNode map3 = new SingleInputPlanNode(getMapNode(), "Mapper 3", toMap3, DriverStrategy.MAP); - - // set local strategy in first channel, so later non matching local properties do not matter - { - GlobalProperties gp = new GlobalProperties(); - LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING)); - - RequestedLocalProperties reqLp = new RequestedLocalProperties(); - reqLp.setGroupedFields(new FieldList(4, 1)); - - toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false}); - - toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.NONE); - - toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap3.setLocalStrategy(LocalStrategy.NONE); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - toMap3.setRequiredGlobalProps(null); - toMap3.setRequiredLocalProps(reqLp); - - FeedbackPropertiesMeetRequirementsReport report = map3.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // set global strategy in first channel, so later non matching global properties do not matter - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(5, 3)); - LocalProperties lp = LocalProperties.EMPTY; - - RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldSet(2, 3)); - - toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(1, 2), DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.NONE); - - toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.NONE); - - toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap3.setLocalStrategy(LocalStrategy.NONE); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toMap2.setRequiredGlobalProps(null); - toMap2.setRequiredLocalProps(null); - - toMap3.setRequiredGlobalProps(reqGp); - toMap3.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = map3.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testNoPartialSolutionFoundTwoInputOperator() { - try { - SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution"); - - SourcePlanNode source1 = new SourcePlanNode(getSourceNode(), "Source 1"); - SourcePlanNode source2 = new SourcePlanNode(getSourceNode(), "Source 2"); - - Channel toMap1 = new Channel(source1); - toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP); - - Channel toMap2 = new Channel(source2); - toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP); - - Channel toJoin1 = new Channel(map1); - Channel toJoin2 = new Channel(map2); - - toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.NONE); - toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin2.setLocalStrategy(LocalStrategy.NONE); - - DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST); - - FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, new GlobalProperties(), new LocalProperties()); - assertEquals(NO_PARTIAL_SOLUTION, report); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testTwoOperatorsOneIndependent() { - try { - SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution"); - SourcePlanNode source = new SourcePlanNode(getSourceNode(), "Other Source"); - - Channel toMap1 = new Channel(target); - toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP); - - Channel toMap2 = new Channel(source); - toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP); - - Channel toJoin1 = new Channel(map1); - Channel toJoin2 = new Channel(map2); - - DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST); - - Channel toAfterJoin = new Channel(join); - toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toAfterJoin.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP); - - // attach some properties to the non-relevant input - { - toMap2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(2, 7), new boolean[] {true, true}); - - RequestedGlobalProperties joinGp = new RequestedGlobalProperties(); - joinGp.setFullyReplicated(); - - RequestedLocalProperties joinLp = new RequestedLocalProperties(); - joinLp.setOrdering(new Ordering(2, null, Order.ASCENDING).appendOrdering(7, null, Order.ASCENDING)); - - toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin2.setLocalStrategy(LocalStrategy.NONE); - toJoin2.setRequiredGlobalProps(joinGp); - toJoin2.setRequiredLocalProps(joinLp); - } - - // ------------------------------------------------------------------------------------ - - // no properties from the partial solution, no required properties - { - toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.NONE); - - GlobalProperties gp = new GlobalProperties(); - LocalProperties lp = LocalProperties.EMPTY; - - FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some properties from the partial solution, no required properties - { - toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.NONE); - - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - - // produced properties match relevant input - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(0)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(2)); - - toJoin1.setRequiredGlobalProps(rgp); - toJoin1.setRequiredLocalProps(rlp); - - toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.NONE); - - FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // produced properties do not match relevant input - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(0)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(1, 2, 3)); - - toJoin1.setRequiredGlobalProps(rgp); - toJoin1.setRequiredLocalProps(rlp); - - toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.NONE); - - FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // produced properties overridden before join - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(0)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(2, 1)); - - toMap1.setRequiredGlobalProps(rgp); - toMap1.setRequiredLocalProps(rlp); - - toJoin1.setRequiredGlobalProps(null); - toJoin1.setRequiredLocalProps(null); - - toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1), DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false}); - - FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(MET, report); - } - - // produced properties before join match, after join match as well - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(0)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(2, 1)); - - toMap1.setRequiredGlobalProps(null); - toMap1.setRequiredLocalProps(null); - - toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.NONE); - - toJoin1.setRequiredGlobalProps(rgp); - toJoin1.setRequiredLocalProps(rlp); - - toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toAfterJoin.setLocalStrategy(LocalStrategy.NONE); - - toAfterJoin.setRequiredGlobalProps(rgp); - toAfterJoin.setRequiredLocalProps(rlp); - - FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // produced properties before join match, after join do not match - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp1 = new RequestedGlobalProperties(); - rgp1.setHashPartitioned(new FieldList(0)); - - RequestedGlobalProperties rgp2 = new RequestedGlobalProperties(); - rgp2.setHashPartitioned(new FieldList(3)); - - RequestedLocalProperties rlp1 = new RequestedLocalProperties(); - rlp1.setGroupedFields(new FieldList(2, 1)); - - RequestedLocalProperties rlp2 = new RequestedLocalProperties(); - rlp2.setGroupedFields(new FieldList(3, 4)); - - toJoin1.setRequiredGlobalProps(rgp1); - toJoin1.setRequiredLocalProps(rlp1); - - toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toAfterJoin.setLocalStrategy(LocalStrategy.NONE); - - toAfterJoin.setRequiredGlobalProps(rgp2); - toAfterJoin.setRequiredLocalProps(rlp2); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // produced properties are overridden, does not matter that they do not match - { - GlobalProperties gp = new GlobalProperties(); - gp.setAnyPartitioning(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(1)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(1, 2, 3)); - - toJoin1.setRequiredGlobalProps(null); - toJoin1.setRequiredLocalProps(null); - - toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1), DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false}); - - toAfterJoin.setRequiredGlobalProps(rgp); - toAfterJoin.setRequiredLocalProps(rlp); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(MET, report); - } - - // local property overridden before join, local property mismatch after join not relevant - { - GlobalProperties gp = new GlobalProperties(); - gp.setAnyPartitioning(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(1, 2, 3)); - - toJoin1.setRequiredGlobalProps(null); - toJoin1.setRequiredLocalProps(null); - - toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false}); - - toAfterJoin.setRequiredGlobalProps(null); - toAfterJoin.setRequiredLocalProps(rlp); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // local property overridden before join, global property mismatch after join void the match - { - GlobalProperties gp = new GlobalProperties(); - gp.setAnyPartitioning(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(1)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(1, 2, 3)); - - toJoin1.setRequiredGlobalProps(null); - toJoin1.setRequiredLocalProps(null); - - toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false}); - - toAfterJoin.setRequiredGlobalProps(rgp); - toAfterJoin.setRequiredLocalProps(rlp); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testTwoOperatorsBothDependent() { - try { - SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution"); - - Channel toMap1 = new Channel(target); - toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap1.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP); - - Channel toMap2 = new Channel(target); - toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toMap2.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP); - - Channel toJoin1 = new Channel(map1); - toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.NONE); - - Channel toJoin2 = new Channel(map2); - toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin2.setLocalStrategy(LocalStrategy.NONE); - - DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST); - - Channel toAfterJoin = new Channel(join); - toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toAfterJoin.setLocalStrategy(LocalStrategy.NONE); - SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP); - - // no properties from the partial solution, no required properties - { - GlobalProperties gp = new GlobalProperties(); - LocalProperties lp = LocalProperties.EMPTY; - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // some properties from the partial solution, no required properties - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // test requirements on one input and met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(0)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(2, 1)); - - toJoin1.setRequiredGlobalProps(rgp); - toJoin1.setRequiredLocalProps(rlp); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // test requirements on both input and met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(0)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(2, 1)); - - toJoin1.setRequiredGlobalProps(rgp); - toJoin1.setRequiredLocalProps(rlp); - - toJoin2.setRequiredGlobalProps(rgp); - toJoin2.setRequiredLocalProps(rlp); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET); - } - - // test requirements on both inputs, one not met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp1 = new RequestedGlobalProperties(); - rgp1.setHashPartitioned(new FieldList(0)); - - RequestedLocalProperties rlp1 = new RequestedLocalProperties(); - rlp1.setGroupedFields(new FieldList(2, 1)); - - RequestedGlobalProperties rgp2 = new RequestedGlobalProperties(); - rgp2.setHashPartitioned(new FieldList(1)); - - RequestedLocalProperties rlp2 = new RequestedLocalProperties(); - rlp2.setGroupedFields(new FieldList(0, 3)); - - toJoin1.setRequiredGlobalProps(rgp1); - toJoin1.setRequiredLocalProps(rlp1); - - toJoin2.setRequiredGlobalProps(rgp2); - toJoin2.setRequiredLocalProps(rlp2); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // test override on both inputs, later requirement ignored - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(1)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(0, 3)); - - toJoin1.setRequiredGlobalProps(null); - toJoin1.setRequiredLocalProps(null); - - toJoin2.setRequiredGlobalProps(null); - toJoin2.setRequiredLocalProps(null); - - toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED); - toJoin2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED); - - toAfterJoin.setRequiredGlobalProps(rgp); - toAfterJoin.setRequiredLocalProps(rlp); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(MET, report); - } - - // test override on one inputs, later requirement met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(0)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(2, 1)); - - toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED); - toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - - toAfterJoin.setRequiredGlobalProps(rgp); - toAfterJoin.setRequiredLocalProps(rlp); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(PENDING, report); - } - - // test override on one input, later requirement not met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(3)); - - RequestedLocalProperties rlp = new RequestedLocalProperties(); - rlp.setGroupedFields(new FieldList(77, 69)); - - toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED); - toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - - toAfterJoin.setRequiredGlobalProps(rgp); - toAfterJoin.setRequiredLocalProps(rlp); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - - // test override on one input locally, later global requirement not met - { - GlobalProperties gp = new GlobalProperties(); - gp.setHashPartitioned(new FieldList(0)); - LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1)); - - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setHashPartitioned(new FieldList(3)); - - - toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(3), new boolean[] { false }); - - toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - toJoin1.setLocalStrategy(LocalStrategy.NONE); - - toAfterJoin.setRequiredGlobalProps(rgp); - toAfterJoin.setRequiredLocalProps(null); - - FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp); - assertEquals(NOT_MET, report); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - private static DataSourceNode getSourceNode() { - return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(new TextInputFormat(new Path("/")), new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO))); - } - - private static MapNode getMapNode() { - return new MapNode(new MapOperatorBase<String, String, MapFunction<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op")); - } - - private static JoinNode getJoinNode() { - return new JoinNode(new JoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op")); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java deleted file mode 100644 index 77d185d..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer; - -import static org.junit.Assert.fail; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.java.record.operators.CoGroupOperator; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.util.DummyCoGroupStub; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityReduce; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.StringValue; -import org.junit.Assert; -import org.junit.Test; - -/** - * This test case has been created to validate that correct strategies are used if orders within groups are - * requested. - */ -@SuppressWarnings({"serial", "deprecation"}) -public class GroupOrderTest extends CompilerTestBase { - - @Test - public void testReduceWithGroupOrder() { - // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - - ReduceOperator reduce = ReduceOperator.builder(new IdentityReduce()).keyField(IntValue.class, 2).name("Reduce").input(source).build(); - Ordering groupOrder = new Ordering(5, StringValue.class, Order.DESCENDING); - reduce.setGroupOrder(groupOrder); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, reduce, "Sink"); - - - Plan plan = new Plan(sink, "Test Temp Task"); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - - OptimizedPlan oPlan; - try { - oPlan = compileNoStats(plan); - } catch(CompilerException ce) { - ce.printStackTrace(); - fail("The pact compiler is unable to compile this plan correctly."); - return; // silence the compiler - } - - OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan); - SinkPlanNode sinkNode = resolver.getNode("Sink"); - SingleInputPlanNode reducer = resolver.getNode("Reduce"); - - // verify the strategies - Assert.assertEquals(ShipStrategyType.FORWARD, sinkNode.getInput().getShipStrategy()); - Assert.assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy()); - - Channel c = reducer.getInput(); - Assert.assertEquals(LocalStrategy.SORT, c.getLocalStrategy()); - - FieldList ship = new FieldList(2); - FieldList local = new FieldList(2, 5); - Assert.assertEquals(ship, c.getShipStrategyKeys()); - Assert.assertEquals(local, c.getLocalStrategyKeys()); - Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders(0)[0]); - - // check that we indeed sort descending - Assert.assertTrue(c.getLocalStrategySortOrder()[1] == groupOrder.getFieldSortDirections()[0]); - } - - @Test - public void testCoGroupWithGroupOrder() { - // construct the plan - FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source1"); - FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source2"); - - CoGroupOperator coGroup = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 3, 6) - .keyField(LongValue.class, 0, 0) - .name("CoGroup").input1(source1).input2(source2).build(); - - Ordering groupOrder1 = new Ordering(5, StringValue.class, Order.DESCENDING); - Ordering groupOrder2 = new Ordering(1, StringValue.class, Order.DESCENDING); - groupOrder2.appendOrdering(4, DoubleValue.class, Order.ASCENDING); - coGroup.setGroupOrderForInputOne(groupOrder1); - coGroup.setGroupOrderForInputTwo(groupOrder2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, coGroup, "Sink"); - - Plan plan = new Plan(sink, "Reduce Group Order Test"); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - - OptimizedPlan oPlan; - try { - oPlan = compileNoStats(plan); - } catch(CompilerException ce) { - ce.printStackTrace(); - fail("The pact compiler is unable to compile this plan correctly."); - return; // silence the compiler - } - - OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan); - SinkPlanNode sinkNode = resolver.getNode("Sink"); - DualInputPlanNode coGroupNode = resolver.getNode("CoGroup"); - - // verify the strategies - Assert.assertEquals(ShipStrategyType.FORWARD, sinkNode.getInput().getShipStrategy()); - Assert.assertEquals(ShipStrategyType.PARTITION_HASH, coGroupNode.getInput1().getShipStrategy()); - Assert.assertEquals(ShipStrategyType.PARTITION_HASH, coGroupNode.getInput2().getShipStrategy()); - - Channel c1 = coGroupNode.getInput1(); - Channel c2 = coGroupNode.getInput2(); - - Assert.assertEquals(LocalStrategy.SORT, c1.getLocalStrategy()); - Assert.assertEquals(LocalStrategy.SORT, c2.getLocalStrategy()); - - FieldList ship1 = new FieldList(new int[] {3, 0}); - FieldList ship2 = new FieldList(new int[] {6, 0}); - - FieldList local1 = new FieldList(new int[] {3, 0, 5}); - FieldList local2 = new FieldList(new int[] {6, 0, 1, 4}); - - Assert.assertEquals(ship1, c1.getShipStrategyKeys()); - Assert.assertEquals(ship2, c2.getShipStrategyKeys()); - Assert.assertEquals(local1, c1.getLocalStrategyKeys()); - Assert.assertEquals(local2, c2.getLocalStrategyKeys()); - - Assert.assertTrue(c1.getLocalStrategySortOrder()[0] == coGroupNode.getSortOrders()[0]); - Assert.assertTrue(c1.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]); - Assert.assertTrue(c2.getLocalStrategySortOrder()[0] == coGroupNode.getSortOrders()[0]); - Assert.assertTrue(c2.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]); - - // check that the local group orderings are correct - Assert.assertTrue(c1.getLocalStrategySortOrder()[2] == groupOrder1.getFieldSortDirections()[0]); - Assert.assertTrue(c2.getLocalStrategySortOrder()[2] == groupOrder2.getFieldSortDirections()[0]); - Assert.assertTrue(c2.getLocalStrategySortOrder()[3] == groupOrder2.getFieldSortDirections()[1]); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java deleted file mode 100644 index 6dadc19..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.operators.CrossOperator; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.util.DummyCrossStub; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityMap; -import org.apache.flink.optimizer.util.IdentityReduce; -import org.apache.flink.types.IntValue; -import org.junit.Test; - -/** - * This class tests plans that once failed because of a bug: - * <ul> - * <li> Ticket 158 - * </ul> - */ -@SuppressWarnings({"serial", "deprecation"}) -public class HardPlansCompilationTest extends CompilerTestBase { - - /** - * Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce -> - * |--------------------------/ / - * |--------------------------------------------/ - * - * First cross has SameKeyFirst output contract - */ - @Test - public void testTicket158() { - // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - - MapOperator map = MapOperator.builder(new IdentityMap()).name("Map1").input(source).build(); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce1").input(map).build(); - - CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub()).name("Cross1").input1(reduce1).input2(source).build(); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce2").input(cross1).build(); - - CrossOperator cross2 = CrossOperator.builder(new DummyCrossStub()).name("Cross2").input1(reduce2).input2(source).build(); - - ReduceOperator reduce3 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce3").input(cross2).build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setInput(reduce3); - - Plan plan = new Plan(sink, "Test Temp Task"); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - - OptimizedPlan oPlan = compileNoStats(plan); - JobGraphGenerator jobGen = new JobGraphGenerator(); - jobGen.compileJobGraph(oPlan); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java deleted file mode 100644 index ac4f820..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java +++ /dev/null @@ -1,409 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.junit.Test; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.DeltaIteration; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.IterativeDataSet; -import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.functions.RichJoinFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.plan.BulkIterationPlanNode; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; -import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; -import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor; -import org.apache.flink.optimizer.testfunctions.IdentityMapper; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.util.Collector; - - -@SuppressWarnings({"serial", "unchecked"}) -public class IterationsCompilerTest extends CompilerTestBase { - - @Test - public void testSolutionSetDeltaDependsOnBroadcastVariable() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Long, Long>> source = - env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>()); - - DataSet<Tuple2<Long, Long>> invariantInput = - env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>()); - - // iteration from here - DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = source.iterateDelta(source, 1000, 1); - - DataSet<Tuple2<Long, Long>> result = - invariantInput - .map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data") - .join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1); - - iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print(); - - OptimizedPlan p = compileNoStats(env.createProgramPlan()); - - // check that the JSON generator accepts this plan - new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p); - - // check that the JobGraphGenerator accepts the plan - new JobGraphGenerator().compileJobGraph(p); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testTwoIterationsWithMapperInbetween() throws Exception { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); - - DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L)); - - DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L)); - - DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges); - - DataSet<Tuple2<Long, Long>> mappedBulk = bulkResult.map(new DummyMap()); - - DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges); - - depResult.print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - assertEquals(1, op.getDataSinks().size()); - assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode); - - WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource(); - - assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy()); - assertTrue(wipn.getInput2().getTempMode().breaksPipeline()); - - new JobGraphGenerator().compileJobGraph(op); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testTwoIterationsDirectlyChained() throws Exception { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); - - DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L)); - - DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L)); - - DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges); - - DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges); - - depResult.print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - assertEquals(1, op.getDataSinks().size()); - assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode); - - WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource(); - - assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy()); - assertTrue(wipn.getInput2().getTempMode().breaksPipeline()); - - new JobGraphGenerator().compileJobGraph(op); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testTwoWorksetIterationsDirectlyChained() throws Exception { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); - - DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L)); - - DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L)); - - DataSet<Tuple2<Long, Long>> firstResult = doDeltaIteration(verticesWithInitialId, edges); - - DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges); - - secondResult.print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - assertEquals(1, op.getDataSinks().size()); - assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode); - - WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy()); - assertTrue(wipn.getInput2().getTempMode().breaksPipeline()); - - new JobGraphGenerator().compileJobGraph(op); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testIterationPushingWorkOut() throws Exception { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); - - DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue()); - - DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class); - - doBulkIteration(input1, input2).print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - assertEquals(1, op.getDataSinks().size()); - assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof BulkIterationPlanNode); - - BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource(); - - // check that work has not! been pushed out, as the end of the step function does not produce the necessary properties - for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) { - assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy()); - } - - new JobGraphGenerator().compileJobGraph(op); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testWorksetIterationPipelineBreakerPlacement() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); - - // the workset (input two of the delta iteration) is the same as what is consumed be the successive join - DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue()); - - DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue()); - - // trivial iteration, since we are interested in the inputs to the iteration - DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0); - - DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>()); - - DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next); - - initialWorkset - .join(result, JoinHint.REPARTITION_HASH_FIRST) - .where(0).equalTo(0) - .print(); - - Plan p = env.createProgramPlan(); - compileNoStats(p); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testResetPartialSolution() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Long> width = env.generateSequence(1, 10); - DataSet<Long> update = env.generateSequence(1, 10); - DataSet<Long> lastGradient = env.generateSequence(1, 10); - - DataSet<Long> init = width.union(update).union(lastGradient); - - IterativeDataSet<Long> iteration = init.iterate(10); - - width = iteration.filter(new IdFilter<Long>()); - update = iteration.filter(new IdFilter<Long>()); - lastGradient = iteration.filter(new IdFilter<Long>()); - - DataSet<Long> gradient = width.map(new IdentityMapper<Long>()); - DataSet<Long> term = gradient.join(lastGradient) - .where(new IdentityKeyExtractor<Long>()) - .equalTo(new IdentityKeyExtractor<Long>()) - .with(new JoinFunction<Long, Long, Long>() { - public Long join(Long first, Long second) { return null; } - }); - - update = update.map(new RichMapFunction<Long, Long>() { - public Long map(Long value) { return null; } - }).withBroadcastSet(term, "some-name"); - - DataSet<Long> result = iteration.closeWith(width.union(update).union(lastGradient)); - - result.print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - new JobGraphGenerator().compileJobGraph(op); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) { - - // open a bulk iteration - IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(20); - - DataSet<Tuple2<Long, Long>> changes = iteration - .join(edges).where(0).equalTo(0).with(new Join222()) - .groupBy(0).aggregate(Aggregations.MIN, 1) - .join(iteration).where(0).equalTo(0) - .flatMap(new FlatMapJoin()); - - // close the bulk iteration - return iteration.closeWith(changes); - } - - - public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) { - - DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0); - - DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0) - .projectSecond(1); - - DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101()); - - DataSet<Tuple2<Long, Long>> candidatesDependencies = - grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1); - - DataSet<Tuple2<Long, Long>> verticesWithNewComponents = - candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0) - .with(new Join222()) - .groupBy(0).aggregate(Aggregations.MIN, 1); - - DataSet<Tuple2<Long, Long>> updatedComponentId = - verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0) - .flatMap(new FlatMapJoin()); - - DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId); - - return depResult; - - } - - // -------------------------------------------------------------------------------------------- - - public static final class Join222 extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { - - @Override - public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) { - return null; - } - } - - public static final class FlatMapJoin extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> { - - @Override - public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {} - } - - public static final class DummyMap extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { - - @Override - public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception { - return value; - } - } - - @ForwardedFields("0") - public static final class Reduce101 extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> { - - @Override - public void reduce(Iterable<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {} - } - - @ForwardedFields("0") - public static final class DuplicateValue extends RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> { - - @Override - public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception { - return new Tuple2<Long, Long>(value.f0, value.f0); - } - } - - public static final class DuplicateValueScalar<T> extends RichMapFunction<T, Tuple2<T, T>> { - - @Override - public Tuple2<T, T> map(T value) { - return new Tuple2<T, T>(value, value); - } - } - - public static final class IdFilter<T> implements FilterFunction<T> { - @Override - public boolean filter(T value) { - return true; - } - } -}