http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java deleted file mode 100644 index 49e5d85..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java +++ /dev/null @@ -1,941 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.compiler; - -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.operators.DualInputSemanticProperties; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.SingleInputSemanticProperties; -import org.apache.flink.api.common.operators.base.JoinOperatorBase; -import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.common.operators.base.ReduceOperatorBase; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.operators.translation.JavaPlan; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple8; -import org.apache.flink.api.java.typeutils.BasicTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.compiler.dataproperties.GlobalProperties; -import org.apache.flink.compiler.dataproperties.LocalProperties; -import org.apache.flink.compiler.dataproperties.PartitioningProperty; -import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties; -import org.apache.flink.compiler.dataproperties.RequestedLocalProperties; -import org.apache.flink.compiler.plan.Channel; -import org.apache.flink.compiler.plan.DualInputPlanNode; -import org.apache.flink.compiler.plan.OptimizedPlan; -import org.apache.flink.compiler.plan.PlanNode; -import org.apache.flink.compiler.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.util.Visitor; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Set; - -public class SemanticPropOptimizerTest extends CompilerTestBase { - - private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo = - new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>( - BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO - ); - - public static class SimpleReducer implements ReduceFunction<Tuple3<Integer, Integer, Integer>> { - @Override - public Tuple3<Integer, Integer, Integer> reduce(Tuple3<Integer, Integer, Integer> value1, Tuple3<Integer, Integer, Integer> value2) throws Exception { - return null; - } - } - - public static class SimpleMap implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> { - @Override - public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception { - return null; - } - } - - @Test - public void forwardFieldsTestMapReduce() { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Integer, Integer>> set = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); - set = set.map(new SimpleMap()).withConstantSet("*") - .groupBy(0) - .reduce(new SimpleReducer()).withConstantSet("0->1") - .map(new SimpleMap()).withConstantSet("*") - .groupBy(1) - .reduce(new SimpleReducer()).withConstantSet("*"); - - set.print(); - JavaPlan plan = env.createProgramPlan(); - OptimizedPlan oPlan = compileWithStats(plan); - - oPlan.accept(new Visitor<PlanNode>() { - @Override - public boolean preVisit(PlanNode visitable) { - if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperatorBase) { - for (Channel input: visitable.getInputs()) { - GlobalProperties gprops = visitable.getGlobalProperties(); - LocalProperties lprops = visitable.getLocalProperties(); - - Assert.assertTrue("Reduce should just forward the input if it is already partitioned", - input.getShipStrategy() == ShipStrategyType.FORWARD); - Assert.assertTrue("Wrong GlobalProperties on Reducer", - gprops.isPartitionedOnFields(new FieldSet(1))); - Assert.assertTrue("Wrong GlobalProperties on Reducer", - gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); - Assert.assertTrue("Wrong LocalProperties on Reducer", - lprops.getGroupedFields().contains(1)); - } - } - if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof MapOperatorBase) { - for (Channel input: visitable.getInputs()) { - GlobalProperties gprops = visitable.getGlobalProperties(); - LocalProperties lprops = visitable.getLocalProperties(); - - Assert.assertTrue("Map should just forward the input if it is already partitioned", - input.getShipStrategy() == ShipStrategyType.FORWARD); - Assert.assertTrue("Wrong GlobalProperties on Mapper", - gprops.isPartitionedOnFields(new FieldSet(1))); - Assert.assertTrue("Wrong GlobalProperties on Mapper", - gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); - Assert.assertTrue("Wrong LocalProperties on Mapper", - lprops.getGroupedFields().contains(1)); - } - return false; - } - return true; - } - - @Override - public void postVisit(PlanNode visitable) { - - } - }); - } - - @Test - public void localPropertiesFilterNothingPreservedTest() { - FieldList grouping = new FieldList().addFields(0, 3, 5, 7); - - String[] constantSet = {"0->2", "3->1", "5->5"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - LocalProperties lprops = LocalProperties.forGrouping(grouping); - - LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - Assert.assertTrue(result.getOrdering() == null); - Assert.assertTrue(result.getGroupedFields() == null); - Assert.assertTrue(result.getUniqueFields() == null); - } - - @Test - public void localPropertiesFilterWrongInputTest() { - FieldList grouping = new FieldList().addFields(0, 3, 5, 7); - - String[] constantSet = {"0->2", "3->1", "5->5"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - LocalProperties lprops = LocalProperties.forGrouping(grouping); - - try { - LocalProperties result = lprops.filterBySemanticProperties(sprops, 1); - } catch (Exception e) { - return; - } - Assert.fail(); - } - - @Test - public void localPropertiesFilterGroupingPreservedTest() { - FieldList grouping = new FieldList().addFields(0, 3, 5, 7); - - String[] constantSet = {"0->2", "3->1", "5->5", "7->0,4"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - LocalProperties lprops = LocalProperties.forGrouping(grouping); - - LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - Assert.assertTrue(result.getOrdering() == null); - - Assert.assertTrue(result.getGroupedFields().size() == 5); - Assert.assertTrue(result.getGroupedFields().contains(2)); - Assert.assertTrue(result.getGroupedFields().contains(1)); - Assert.assertTrue(result.getGroupedFields().contains(5)); - Assert.assertTrue(result.getGroupedFields().contains(0)); - Assert.assertTrue(result.getGroupedFields().contains(4)); - - Assert.assertTrue(result.getUniqueFields() == null); - } - - @Test - public void localPropertiesFilterGroupingPreservedTest2() { - FieldList grouping = new FieldList().addFields(0, 3, 5, 7); - - String[] constantSet = {"0->0", "3->3", "5->5", "7->7"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - LocalProperties lprops = LocalProperties.forGrouping(grouping); - - LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - Assert.assertTrue(result.getOrdering() == null); - - Assert.assertTrue(result.getGroupedFields().size() == 4); - Assert.assertTrue(result.getGroupedFields().contains(0)); - Assert.assertTrue(result.getGroupedFields().contains(3)); - Assert.assertTrue(result.getGroupedFields().contains(5)); - Assert.assertTrue(result.getGroupedFields().contains(7)); - - Assert.assertTrue(result.getUniqueFields() == null); - } - - @Test - public void localPropertiesFilterGroupingNothingPreservedTest() { - FieldList grouping = new FieldList().addFields(0, 3, 5, 7); - - String[] constantSet = {"0->2", "3->1", "7->0,4"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - LocalProperties lprops = LocalProperties.forGrouping(grouping); - - LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - Assert.assertTrue(result.getGroupedFields() == null); - } - - @Test - public void localPropertiesFilterOrderingPreservedTest() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(0, null, Order.ASCENDING); - ordering.appendOrdering(1, null, Order.ASCENDING); - ordering.appendOrdering(2, null, Order.ASCENDING); - - String[] constantSet = {"0->2", "1->3", "2->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - LocalProperties lprops = LocalProperties.forOrdering(ordering); - - LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - FieldList involved = result.getOrdering().getInvolvedIndexes(); - Order[] orders = result.getOrdering().getFieldOrders(); - - - Assert.assertTrue(involved.size() == 3); - Assert.assertTrue(involved.contains(2)); - Assert.assertTrue(involved.contains(3)); - Assert.assertTrue(involved.contains(0)); - Assert.assertTrue(orders[0] == Order.ASCENDING); - Assert.assertTrue(orders[1] == Order.ASCENDING); - Assert.assertTrue(orders[2] == Order.ASCENDING); - } - - @Test - public void localPropertiesFilterOrderingNothingPreservedTest() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(0, null, Order.ASCENDING); - ordering.appendOrdering(1, null, Order.ASCENDING); - ordering.appendOrdering(2, null, Order.ASCENDING); - - String[] constantSet = {"0->2", "2->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - LocalProperties lprops = LocalProperties.forOrdering(ordering); - - LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - Assert.assertTrue(result.getOrdering() == null); - } - - - - @Test - public void localPropertiesFilterUniqueFieldsPreservedTest() { - String[] constantSet = {"0->1", "1->2", "3->4", "2->3", "6->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - - FieldSet set1 = new FieldSet(0, 1, 2, 3); - FieldSet set2 = new FieldSet(4, 5, 6, 7); - FieldSet set3 = new FieldSet(0, 1, 6, 2); - LocalProperties lprops = new LocalProperties(); - lprops = lprops.addUniqueFields(set1).addUniqueFields(set2).addUniqueFields(set3); - - - LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - Set<FieldSet> unique = result.getUniqueFields(); - FieldSet expected1 = new FieldSet(1, 2, 3, 4); - FieldSet expected2 = new FieldSet(0, 1, 2, 3); - - - Assert.assertTrue(unique.size() == 2); - Assert.assertTrue(unique.contains(expected1)); - Assert.assertTrue(unique.contains(expected2)); - } - - @Test - public void requestedLocalPropertiesFilterNothingPreservedTest() { - FieldList grouping = new FieldList().addFields(0, 3, 5, 7); - - String[] constantSet = {"0->2", "3->1", "5->5"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - RequestedLocalProperties lprops = new RequestedLocalProperties(); - lprops.setGroupedFields(grouping); - - RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - Assert.assertTrue(result == null); - } - - @Test - public void requestedLocalPropertiesFilterWrongInputTest() { - FieldList grouping = new FieldList().addFields(0, 3, 5, 7); - - String[] constantSet = {"0->2", "3->1", "5->5"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - RequestedLocalProperties lprops = new RequestedLocalProperties(); - lprops.setGroupedFields(grouping); - - try { - RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 1); - } catch (Exception e) { - return; - } - Assert.fail(); - } - - @Test - public void requestedLocalPropertiesFilterGroupingPreservedTest() { - FieldList grouping = new FieldList().addFields(0, 3, 5, 7); - - String[] constantSet = {"6->7", "1->5", "4->3", "7->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - RequestedLocalProperties lprops = new RequestedLocalProperties(); - lprops.setGroupedFields(grouping); - - RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - - Assert.assertTrue(result.getGroupedFields().size() == 4); - Assert.assertTrue(result.getGroupedFields().contains(7)); - Assert.assertTrue(result.getGroupedFields().contains(4)); - Assert.assertTrue(result.getGroupedFields().contains(1)); - Assert.assertTrue(result.getGroupedFields().contains(6)); - } - - @Test - public void requestedLocalPropertiesFilterGroupingPreservedTest2() { - FieldList grouping = new FieldList().addFields(0, 3, 5, 7); - - String[] constantSet = {"0->0", "3->3", "5->5", "7->7"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - RequestedLocalProperties lprops = new RequestedLocalProperties(); - lprops.setGroupedFields(grouping); - - RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - - Assert.assertTrue(result.getGroupedFields().size() == 4); - Assert.assertTrue(result.getGroupedFields().contains(0)); - Assert.assertTrue(result.getGroupedFields().contains(3)); - Assert.assertTrue(result.getGroupedFields().contains(5)); - Assert.assertTrue(result.getGroupedFields().contains(7)); - } - - @Test - public void requestedLocalPropertiesFilterOrderingPreservedTest() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(2, null, Order.ASCENDING); - ordering.appendOrdering(3, null, Order.ASCENDING); - ordering.appendOrdering(0, null, Order.ASCENDING); - - String[] constantSet = {"6->2", "2->3", "5->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - RequestedLocalProperties lprops = new RequestedLocalProperties(); - lprops.setOrdering(ordering); - - RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - FieldList involved = result.getOrdering().getInvolvedIndexes(); - Order[] orders = result.getOrdering().getFieldOrders(); - - Assert.assertTrue(involved.size() == 3); - Assert.assertTrue(involved.contains(6)); - Assert.assertTrue(involved.contains(2)); - Assert.assertTrue(involved.contains(5)); - Assert.assertTrue(orders[0] == Order.ASCENDING); - Assert.assertTrue(orders[1] == Order.ASCENDING); - Assert.assertTrue(orders[2] == Order.ASCENDING); - } - - @Test - public void requestedLocalPropertiesFilterOrderingPreservedTest2() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(0, null, Order.ASCENDING); - ordering.appendOrdering(3, null, Order.ASCENDING); - ordering.appendOrdering(6, null, Order.ASCENDING); - - String[] constantSet = {"0->0", "3->3", "6->6"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - RequestedLocalProperties lprops = new RequestedLocalProperties(); - lprops.setOrdering(ordering); - - RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - FieldList involved = result.getOrdering().getInvolvedIndexes(); - Order[] orders = result.getOrdering().getFieldOrders(); - - Assert.assertTrue(involved.size() == 3); - Assert.assertTrue(involved.contains(0)); - Assert.assertTrue(involved.contains(3)); - Assert.assertTrue(involved.contains(6)); - Assert.assertTrue(orders[0] == Order.ASCENDING); - Assert.assertTrue(orders[1] == Order.ASCENDING); - Assert.assertTrue(orders[2] == Order.ASCENDING); - } - - @Test - public void requestedLocalPropertiesFilterOrderingNothingPreservedTest() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(2, null, Order.ASCENDING); - ordering.appendOrdering(3, null, Order.ASCENDING); - ordering.appendOrdering(0, null, Order.ASCENDING); - - String[] constantSet = {"6->2", "5->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - RequestedLocalProperties lprops = new RequestedLocalProperties(); - lprops.setOrdering(ordering); - - RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); - - Assert.assertTrue(result == null); - } - - @Test - public void requestedLocalPropertiesDualFilterOrderingPreservedTest() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(2, null, Order.ASCENDING); - ordering.appendOrdering(3, null, Order.ASCENDING); - ordering.appendOrdering(0, null, Order.ASCENDING); - - String[] constantSet = {"6->2", "2->3", "5->0"}; - DualInputSemanticProperties dprops = new DualInputSemanticProperties(); - SemanticPropUtil.getSemanticPropsDualFromString(dprops, constantSet, constantSet, null, null, null, null, tupleInfo, tupleInfo, tupleInfo); - - RequestedLocalProperties lprops1 = new RequestedLocalProperties(); - lprops1.setOrdering(ordering); - - RequestedLocalProperties lprops2 = new RequestedLocalProperties(); - lprops2.setOrdering(ordering); - - RequestedLocalProperties result1 = lprops1.filterBySemanticProperties(dprops, 0); - RequestedLocalProperties result2 = lprops2.filterBySemanticProperties(dprops, 1); - - FieldSet involved1 = result1.getOrdering().getInvolvedIndexes(); - FieldSet involved2 = result2.getOrdering().getInvolvedIndexes(); - Order[] orders1 = result1.getOrdering().getFieldOrders(); - Order[] orders2 = result2.getOrdering().getFieldOrders(); - - - Assert.assertTrue(involved1.size() == 3); - Assert.assertTrue(involved1.contains(6)); - Assert.assertTrue(involved1.contains(2)); - Assert.assertTrue(involved1.contains(5)); - Assert.assertTrue(orders1[0] == Order.ASCENDING); - Assert.assertTrue(orders1[1] == Order.ASCENDING); - Assert.assertTrue(orders1[2] == Order.ASCENDING); - - Assert.assertTrue(involved2.size() == 3); - Assert.assertTrue(involved2.contains(6)); - Assert.assertTrue(involved2.contains(2)); - Assert.assertTrue(involved2.contains(5)); - Assert.assertTrue(orders2[0] == Order.ASCENDING); - Assert.assertTrue(orders2[1] == Order.ASCENDING); - Assert.assertTrue(orders2[2] == Order.ASCENDING); - } - - @Test - public void globalPropertiesFilterNothingPreservedTest() { - String[] constantSet = {"0->2", "3->1", "5->5"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - GlobalProperties gprops = new GlobalProperties(); - GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - Assert.assertTrue(result.getOrdering() == null); - Assert.assertTrue(result.getPartitioningFields() == null); - } - - @Test - public void globalPropertiesFilterWrongInputTest() { - String[] constantSet = {"0->2", "3->1", "5->5"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - GlobalProperties gprops = new GlobalProperties(); - gprops.setHashPartitioned(new FieldList(0, 1, 3, 4, 6)); - - try { - GlobalProperties result = gprops.filterBySemanticProperties(sprops, 1); - } catch (Exception e) { - return; - } - Assert.fail(); - } - - @Test - public void globalPropertiesFilterPartitioningPreservedTest() { - GlobalProperties gprops = new GlobalProperties(); - gprops.setHashPartitioned(new FieldList(0, 1, 4)); - - String[] constantSet = {"0->2", "1->4", "4->7"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - - GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - FieldList list = result.getPartitioningFields(); - Assert.assertTrue(list.size() == 3); - Assert.assertTrue(list.contains(2)); - Assert.assertTrue(list.contains(4)); - Assert.assertTrue(list.contains(7)); - Assert.assertTrue(result.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); - } - - @Test - public void globalPropertiesFilterPartitioningPreservedTest2() { - GlobalProperties gprops = new GlobalProperties(); - gprops.setHashPartitioned(new FieldList(0, 1, 4)); - - String[] constantSet = {"0->2, 6", "1->4, 5", "4->7"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - - GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - FieldList list = result.getPartitioningFields(); - Assert.assertTrue(list.size() == 5); - Assert.assertTrue(list.contains(2)); - Assert.assertTrue(list.contains(4)); - Assert.assertTrue(list.contains(6)); - Assert.assertTrue(list.contains(5)); - Assert.assertTrue(list.contains(7)); - Assert.assertTrue(result.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); - } - - @Test - public void globalPropertiesFilterPartitioningPreservedTest3() { - GlobalProperties gprops = new GlobalProperties(); - gprops.setHashPartitioned(new FieldList(0, 1, 4)); - - String[] constantSet = {"0->0", "1->1", "4->4"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - - GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - FieldList list = result.getPartitioningFields(); - Assert.assertTrue(list.size() == 3); - Assert.assertTrue(list.contains(0)); - Assert.assertTrue(list.contains(1)); - Assert.assertTrue(list.contains(4)); - Assert.assertTrue(result.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); - } - - @Test - public void globalPropertiesFilterOrderingPreservedTest() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(0, null, Order.ASCENDING); - ordering.appendOrdering(1, null, Order.ASCENDING); - ordering.appendOrdering(2, null, Order.ASCENDING); - - String[] constantSet = {"0->2", "1->3", "2->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - GlobalProperties gprops = new GlobalProperties(); - gprops.setOrdering(ordering); - - GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - FieldList involved = result.getOrdering().getInvolvedIndexes(); - Order[] orders = result.getOrdering().getFieldOrders(); - - - Assert.assertTrue(involved.size() == 3); - Assert.assertTrue(involved.contains(2)); - Assert.assertTrue(involved.contains(3)); - Assert.assertTrue(involved.contains(0)); - Assert.assertTrue(orders[0] == Order.ASCENDING); - Assert.assertTrue(orders[1] == Order.ASCENDING); - Assert.assertTrue(orders[2] == Order.ASCENDING); - } - - @Test - public void globalPropertiesFilterOrderingPreservedTest2() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(0, null, Order.ASCENDING); - ordering.appendOrdering(1, null, Order.ASCENDING); - ordering.appendOrdering(2, null, Order.ASCENDING); - - String[] constantSet = {"0->2,4", "1->3", "2->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - GlobalProperties gprops = new GlobalProperties(); - gprops.setOrdering(ordering); - - GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - FieldList involved = result.getOrdering().getInvolvedIndexes(); - Order[] orders = result.getOrdering().getFieldOrders(); - - - Assert.assertTrue(involved.size() == 4); - Assert.assertTrue(involved.contains(2)); - Assert.assertTrue(involved.contains(3)); - Assert.assertTrue(involved.contains(4)); - Assert.assertTrue(involved.contains(0)); - Assert.assertTrue(orders[0] == Order.ASCENDING); - Assert.assertTrue(orders[1] == Order.ASCENDING); - Assert.assertTrue(orders[2] == Order.ASCENDING); - Assert.assertTrue(orders[3] == Order.ASCENDING); - } - - @Test - public void globalPropertiesFilterOrderingPreservedTest3() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(0, null, Order.ASCENDING); - ordering.appendOrdering(1, null, Order.ASCENDING); - ordering.appendOrdering(2, null, Order.ASCENDING); - - String[] constantSet = {"0->0", "1->1", "2->2"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - GlobalProperties gprops = new GlobalProperties(); - gprops.setOrdering(ordering); - - GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - FieldList involved = result.getOrdering().getInvolvedIndexes(); - Order[] orders = result.getOrdering().getFieldOrders(); - - - Assert.assertTrue(involved.size() == 3); - Assert.assertTrue(involved.contains(0)); - Assert.assertTrue(involved.contains(1)); - Assert.assertTrue(involved.contains(2)); - Assert.assertTrue(orders[0] == Order.ASCENDING); - Assert.assertTrue(orders[1] == Order.ASCENDING); - Assert.assertTrue(orders[2] == Order.ASCENDING); - } - - @Test - public void globalPropertiesFilterOrderingNothingPreservedTest() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(0, null, Order.ASCENDING); - ordering.appendOrdering(1, null, Order.ASCENDING); - ordering.appendOrdering(2, null, Order.ASCENDING); - - String[] constantSet = {"0->2", "2->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - GlobalProperties gprops = new GlobalProperties(); - gprops.setOrdering(ordering); - - GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - Assert.assertTrue(result.getOrdering() == null); - } - - @Test - public void globalPropertiesFilterUniqueFieldsPreservedTest() { - String[] constantSet = {"0->1", "1->2", "3->4", "2->3", "6->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - - FieldSet set1 = new FieldSet(0, 1, 2, 3); - FieldSet set2 = new FieldSet(4, 5, 6, 7); - FieldSet set3 = new FieldSet(0, 1, 6, 2); - GlobalProperties gprops = new GlobalProperties(); - gprops.addUniqueFieldCombination(set1); - gprops.addUniqueFieldCombination(set2); - gprops.addUniqueFieldCombination(set3); - - - GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - Set<FieldSet> unique = result.getUniqueFieldCombination(); - FieldSet expected1 = new FieldSet(1, 2, 3, 4); - FieldSet expected2 = new FieldSet(0, 1, 2, 3); - - - Assert.assertTrue(unique.size() == 2); - Assert.assertTrue(unique.contains(expected1)); - Assert.assertTrue(unique.contains(expected2)); - } - - @Test - public void requestedGlobalPropertiesFilterNothingPreservedTest() { - String[] constantSet = {"0->2", "3->1", "5->5"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - RequestedGlobalProperties gprops = new RequestedGlobalProperties(); - RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - Assert.assertTrue(result == null); - } - - @Test - public void requestedGlobalPropertiesDualFilterEverythingPreservedTest() { - String[] constantSet1 = {"0->1,2", "3->0", "2->4"}; - String[] constantSet2 = {"0->3", "4->6", "6->7"}; - - - DualInputSemanticProperties dprops = new DualInputSemanticProperties(); - SemanticPropUtil.getSemanticPropsDualFromString(dprops, constantSet1, constantSet2, null, null, null, null, tupleInfo, tupleInfo, tupleInfo); - - RequestedGlobalProperties gprops1 = new RequestedGlobalProperties(); - RequestedGlobalProperties gprops2 = new RequestedGlobalProperties(); - - gprops1.setHashPartitioned(new FieldSet(2, 0, 4)); - gprops2.setHashPartitioned(new FieldSet(3, 6, 7)); - - gprops1 = gprops1.filterBySemanticProperties(dprops, 0); - gprops2 = gprops2.filterBySemanticProperties(dprops, 1); - - Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); - Assert.assertTrue(gprops1.getPartitionedFields().size() == 3); - Assert.assertTrue(gprops1.getPartitionedFields().contains(0)); - Assert.assertTrue(gprops1.getPartitionedFields().contains(3)); - Assert.assertTrue(gprops1.getPartitionedFields().contains(2)); - - Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); - Assert.assertTrue(gprops2.getPartitionedFields().size() == 3); - Assert.assertTrue(gprops2.getPartitionedFields().contains(0)); - Assert.assertTrue(gprops2.getPartitionedFields().contains(4)); - Assert.assertTrue(gprops2.getPartitionedFields().contains(6)); - } - - @Test - public void requestedGlobalPropertiesDualFilterNothingPreservedTest() { - String[] constantSet1 = {"0->1,2", "3->0", "2->4"}; - String[] constantSet2 = {"0->3", "4->6", "6->7"}; - - - DualInputSemanticProperties dprops = new DualInputSemanticProperties(); - SemanticPropUtil.getSemanticPropsDualFromString(dprops, constantSet1, constantSet2, null, null, null, null, tupleInfo, tupleInfo, tupleInfo); - - RequestedGlobalProperties gprops1 = new RequestedGlobalProperties(); - RequestedGlobalProperties gprops2 = new RequestedGlobalProperties(); - - gprops1.setHashPartitioned(new FieldSet(6, 7)); - gprops2.setHashPartitioned(new FieldSet(3, 6, 7)); - - gprops1 = gprops1.filterBySemanticProperties(dprops, 0); - gprops2 = gprops2.filterBySemanticProperties(dprops, 1); - - Assert.assertTrue(gprops1 == null); - - Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); - Assert.assertTrue(gprops2.getPartitionedFields().size() == 3); - Assert.assertTrue(gprops2.getPartitionedFields().contains(0)); - Assert.assertTrue(gprops2.getPartitionedFields().contains(4)); - Assert.assertTrue(gprops2.getPartitionedFields().contains(6)); - } - - @Test - public void requestedGlobalPropertiesFilterWrongInputTest() { - String[] constantSet = {"0->2", "3->1", "5->5"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - RequestedGlobalProperties gprops = new RequestedGlobalProperties(); - gprops.setHashPartitioned(new FieldList(0, 1, 3, 4, 6)); - - try { - RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 1); - } catch (Exception e) { - return; - } - Assert.fail(); - } - - @Test - public void requestedGlobalPropertiesFilterPartitioningPreservedTest() { - RequestedGlobalProperties gprops = new RequestedGlobalProperties(); - gprops.setHashPartitioned(new FieldList(0, 1, 4)); - - String[] constantSet = {"3->0", "5->1", "2->4"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - - RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - FieldSet set = result.getPartitionedFields(); - Assert.assertTrue(set.size() == 3); - Assert.assertTrue(set.contains(3)); - Assert.assertTrue(set.contains(5)); - Assert.assertTrue(set.contains(2)); - Assert.assertTrue(result.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); - } - - @Test - public void requestedGlobalPropertiesFilterPartitioningPreservedTest2() { - RequestedGlobalProperties gprops = new RequestedGlobalProperties(); - gprops.setHashPartitioned(new FieldList(3, 5, 2)); - - String[] constantSet = {"3->3", "5->5", "2->2"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - - RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - FieldSet set = result.getPartitionedFields(); - Assert.assertTrue(set.size() == 3); - Assert.assertTrue(set.contains(3)); - Assert.assertTrue(set.contains(5)); - Assert.assertTrue(set.contains(2)); - Assert.assertTrue(result.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); - } - - @Test - public void semanticPropertySourceFieldTest() { - SingleInputSemanticProperties props = new SingleInputSemanticProperties(); - props.addForwardedField(2, 4); - props.addForwardedField(3, 8); - props.addForwardedField(4, 10); - props.addForwardedField(4, 11); - - Assert.assertTrue(props.getSourceField(0, 4).size() == 1); - Assert.assertTrue(props.getSourceField(0, 4).contains(2)); - Assert.assertTrue(props.getSourceField(0, 8).size() == 1); - Assert.assertTrue(props.getSourceField(0, 8).contains(3)); - Assert.assertTrue(props.getSourceField(0, 10).size() == 1); - Assert.assertTrue(props.getSourceField(0, 10).contains(4)); - Assert.assertTrue(props.getSourceField(0, 11).size() == 1); - Assert.assertTrue(props.getSourceField(0, 11).contains(4)); - } - - @Test - public void requestedGlobalPropertiesFilterOrderingPreservedTest() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(2, null, Order.ASCENDING); - ordering.appendOrdering(3, null, Order.ASCENDING); - ordering.appendOrdering(0, null, Order.ASCENDING); - - String[] constantSet = {"6->2", "2->3", "5->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - RequestedGlobalProperties gprops = new RequestedGlobalProperties(); - gprops.setOrdering(ordering); - - RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - FieldList involved = result.getOrdering().getInvolvedIndexes(); - Order[] orders = result.getOrdering().getFieldOrders(); - - - Assert.assertTrue(involved.size() == 3); - Assert.assertTrue(involved.contains(6)); - Assert.assertTrue(involved.contains(2)); - Assert.assertTrue(involved.contains(5)); - Assert.assertTrue(orders[0] == Order.ASCENDING); - Assert.assertTrue(orders[1] == Order.ASCENDING); - Assert.assertTrue(orders[2] == Order.ASCENDING); - } - - @Test - public void requestedGlobalPropertiesFilterOrderingNothingPreservedTest() { - Ordering ordering = new Ordering(); - ordering.appendOrdering(2, null, Order.ASCENDING); - ordering.appendOrdering(3, null, Order.ASCENDING); - ordering.appendOrdering(0, null, Order.ASCENDING); - - String[] constantSet = {"6->2", "5->0"}; - SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); - - RequestedGlobalProperties gprops = new RequestedGlobalProperties(); - gprops.setOrdering(ordering); - - RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - - Assert.assertTrue(result == null); - } - - @Test - public void forwardFieldsTestJoin() { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Integer, Integer>> in1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); - DataSet<Tuple3<Integer, Integer, Integer>> in2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); - in1 = in1.map(new SimpleMap()).withConstantSet("*") - .groupBy(0) - .reduce(new SimpleReducer()).withConstantSet("0->1"); - in2 = in2.map(new SimpleMap()).withConstantSet("*") - .groupBy(1) - .reduce(new SimpleReducer()).withConstantSet("1->2"); - DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new JoinFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() { - @Override - public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception { - return null; - } - }); - - out.print(); - JavaPlan plan = env.createProgramPlan(); - OptimizedPlan oPlan = compileWithStats(plan); - - oPlan.accept(new Visitor<PlanNode>() { - @Override - public boolean preVisit(PlanNode visitable) { - if (visitable instanceof DualInputPlanNode && visitable.getPactContract() instanceof JoinOperatorBase) { - DualInputPlanNode node = ((DualInputPlanNode) visitable); - - final Channel inConn1 = node.getInput1(); - final Channel inConn2 = node.getInput2(); - - Assert.assertTrue("Join should just forward the input if it is already partitioned", - inConn1.getShipStrategy() == ShipStrategyType.FORWARD); - Assert.assertTrue("Join should just forward the input if it is already partitioned", - inConn2.getShipStrategy() == ShipStrategyType.FORWARD); - return false; - } - return true; - } - - @Override - public void postVisit(PlanNode visitable) { - - } - }); - } -} -
http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropertiesAPIToPlanTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropertiesAPIToPlanTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropertiesAPIToPlanTest.java new file mode 100644 index 0000000..f9ff491 --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropertiesAPIToPlanTest.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler; + +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.JoinOperatorBase; +import org.apache.flink.api.common.operators.base.MapOperatorBase; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.compiler.dataproperties.GlobalProperties; +import org.apache.flink.compiler.dataproperties.LocalProperties; +import org.apache.flink.compiler.dataproperties.PartitioningProperty; +import org.apache.flink.compiler.plan.Channel; +import org.apache.flink.compiler.plan.DualInputPlanNode; +import org.apache.flink.compiler.plan.OptimizedPlan; +import org.apache.flink.compiler.plan.PlanNode; +import org.apache.flink.compiler.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.Visitor; +import org.junit.Assert; +import org.junit.Test; + +public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase { + + private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo = + new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + @Test + public void forwardFieldsTestMapReduce() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + set = set.map(new MockMapper()).withForwardedFields("*") + .groupBy(0) + .reduce(new MockReducer()).withForwardedFields("f0->f1") + .map(new MockMapper()).withForwardedFields("*") + .groupBy(1) + .reduce(new MockReducer()).withForwardedFields("*"); + + set.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + oPlan.accept(new Visitor<PlanNode>() { + @Override + public boolean preVisit(PlanNode visitable) { + if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperatorBase) { + for (Channel input: visitable.getInputs()) { + GlobalProperties gprops = visitable.getGlobalProperties(); + LocalProperties lprops = visitable.getLocalProperties(); + + Assert.assertTrue("Reduce should just forward the input if it is already partitioned", + input.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Wrong GlobalProperties on Reducer", + gprops.isPartitionedOnFields(new FieldSet(1))); + Assert.assertTrue("Wrong GlobalProperties on Reducer", + gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + Assert.assertTrue("Wrong LocalProperties on Reducer", + lprops.getGroupedFields().contains(1)); + } + } + if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof MapOperatorBase) { + for (Channel input: visitable.getInputs()) { + GlobalProperties gprops = visitable.getGlobalProperties(); + LocalProperties lprops = visitable.getLocalProperties(); + + Assert.assertTrue("Map should just forward the input if it is already partitioned", + input.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Wrong GlobalProperties on Mapper", + gprops.isPartitionedOnFields(new FieldSet(1))); + Assert.assertTrue("Wrong GlobalProperties on Mapper", + gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + Assert.assertTrue("Wrong LocalProperties on Mapper", + lprops.getGroupedFields().contains(1)); + } + return false; + } + return true; + } + + @Override + public void postVisit(PlanNode visitable) { + + } + }); + } + + @Test + public void forwardFieldsTestJoin() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> in1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> in2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + in1 = in1.map(new MockMapper()).withForwardedFields("*") + .groupBy(0) + .reduce(new MockReducer()).withForwardedFields("f0->f1"); + in2 = in2.map(new MockMapper()).withForwardedFields("*") + .groupBy(1) + .reduce(new MockReducer()).withForwardedFields("f1->f2"); + DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new MockJoin()); + + out.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + oPlan.accept(new Visitor<PlanNode>() { + @Override + public boolean preVisit(PlanNode visitable) { + if (visitable instanceof DualInputPlanNode && visitable.getPactContract() instanceof JoinOperatorBase) { + DualInputPlanNode node = ((DualInputPlanNode) visitable); + + final Channel inConn1 = node.getInput1(); + final Channel inConn2 = node.getInput2(); + + Assert.assertTrue("Join should just forward the input if it is already partitioned", + inConn1.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Join should just forward the input if it is already partitioned", + inConn2.getShipStrategy() == ShipStrategyType.FORWARD); + return false; + } + return true; + } + + @Override + public void postVisit(PlanNode visitable) { + + } + }); + } + + public static class MockMapper implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> { + @Override + public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception { + return null; + } + } + + public static class MockReducer implements ReduceFunction<Tuple3<Integer, Integer, Integer>> { + @Override + public Tuple3<Integer, Integer, Integer> reduce(Tuple3<Integer, Integer, Integer> value1, Tuple3<Integer, Integer, Integer> value2) throws Exception { + return null; + } + } + + public static class MockJoin implements JoinFunction<Tuple3<Integer, Integer, Integer>, + Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> { + + @Override + public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception { + return null; + } + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java index d4c0c33..d12fd55 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java @@ -47,10 +47,10 @@ public class SortPartialReuseTest extends CompilerTestBase { input .partitionByHash(0) - .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2") + .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2") .groupBy(0, 1) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2") + .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2") .groupBy(0) .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()) @@ -92,10 +92,10 @@ public class SortPartialReuseTest extends CompilerTestBase { @Override public int partition(Long key, int numPartitions) { return 0; } }, 0) - .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2") + .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2") .groupBy(0, 1) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2") + .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2") .groupBy(1) .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()) http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java index 3508f42..d10b276 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java @@ -239,14 +239,14 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase { @Override public int partition(Long key, int numPartitions) { return 0; } }, 0) - .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2"); + .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2"); DataSet<Tuple3<Long, Long, Long>> grouped = partitioned .distinct(0, 1) .groupBy(1) .sortGroup(0, Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1"); + .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1"); grouped .coGroup(partitioned).where(0).equalTo(0) http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java index 682cd5f..1a3ceae 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java @@ -236,14 +236,14 @@ public class JoinCustomPartitioningTest extends CompilerTestBase { @Override public int partition(Long key, int numPartitions) { return 0; } }, 0) - .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2"); + .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2"); DataSet<Tuple3<Long, Long, Long>> grouped = partitioned .distinct(0, 1) .groupBy(1) .sortGroup(0, Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1"); + .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1"); grouped .join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0) http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java index fc8616e..ec1d056 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java @@ -22,34 +22,408 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.compiler.dag.OptimizerNode; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; +import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; + +import java.util.Set; public class GlobalPropertiesFilteringTest { + private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo = + new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + @Test + public void testAllErased1() { + + SingleInputSemanticProperties semProps = new SingleInputSemanticProperties(); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1)); + gprops.addUniqueFieldCombination(new FieldSet(3, 4)); + gprops.addUniqueFieldCombination(new FieldSet(5, 6)); + + GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0); + + assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + assertNull(result.getPartitioningOrdering()); + assertNull(result.getUniqueFieldCombination()); + } + + @Test + public void testAllErased2() { + + SingleInputSemanticProperties semProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(semProps, new String[]{"2"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1)); + gprops.addUniqueFieldCombination(new FieldSet(3, 4)); + gprops.addUniqueFieldCombination(new FieldSet(5, 6)); + + GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0); + + assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + assertNull(result.getPartitioningOrdering()); + assertNull(result.getUniqueFieldCombination()); + } + + @Test + public void testHashPartitioningPreserved1() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertTrue(pFields.contains(0)); + assertTrue(pFields.contains(1)); + assertTrue(pFields.contains(4)); + } + + @Test + public void testHashPartitioningPreserved2() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertTrue(pFields.contains(1)); + assertTrue(pFields.contains(2)); + assertTrue(pFields.contains(3)); + } + + @Test + public void testHashPartitioningErased() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + } + + @Test + public void testAnyPartitioningPreserved1() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setAnyPartitioning(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertTrue(pFields.contains(0)); + assertTrue(pFields.contains(1)); + assertTrue(pFields.contains(4)); + } + + @Test + public void testAnyPartitioningPreserved2() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setAnyPartitioning(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertTrue(pFields.contains(1)); + assertTrue(pFields.contains(2)); + assertTrue(pFields.contains(3)); + } + + @Test + public void testAnyPartitioningErased() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setAnyPartitioning(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + } + @Test - public void testCustomPartitioningPreserves() { - try { - Partitioner<?> partitioner = new MockPartitioner(); - - GlobalProperties gp = new GlobalProperties(); - gp.setCustomPartitioned(new FieldList(2, 3), partitioner); - - OptimizerNode node = mock(OptimizerNode.class); - when(node.isFieldConstant(Matchers.anyInt(), Matchers.anyInt())).thenReturn(true); - - GlobalProperties filtered = gp.filterByNodesConstantSet(node, 0); - - assertTrue(filtered.isPartitionedOnFields(new FieldSet(2, 3))); - assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, filtered.getPartitioning()); - assertEquals(partitioner, filtered.getCustomPartitioner()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public void testCustomPartitioningPreserved1() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner(); + gprops.setCustomPartitioned(new FieldList(0, 4), myP); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(2, pFields.size()); + assertTrue(pFields.contains(0)); + assertTrue(pFields.contains(4)); + assertEquals(myP, result.getCustomPartitioner()); } + + @Test + public void testCustomPartitioningPreserved2() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner(); + gprops.setCustomPartitioned(new FieldList(0, 4), myP); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(2, pFields.size()); + assertTrue(pFields.contains(1)); + assertTrue(pFields.contains(3)); + assertEquals(myP, result.getCustomPartitioner()); + } + + @Test + public void testCustomPartitioningErased() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner(); + gprops.setCustomPartitioned(new FieldList(0, 4), myP); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + assertNull(result.getCustomPartitioner()); + } + + @Test + public void testRangePartitioningPreserved1() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;2;5"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + o.appendOrdering(2, StringValue.class, Order.ASCENDING); + GlobalProperties gprops = new GlobalProperties(); + gprops.setRangePartitioned(o); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertEquals(1, pFields.get(0).intValue()); + assertEquals(5, pFields.get(1).intValue()); + assertEquals(2, pFields.get(2).intValue()); + Ordering pOrder = result.getPartitioningOrdering(); + assertEquals(3, pOrder.getNumberOfFields()); + assertEquals(1, pOrder.getFieldNumber(0).intValue()); + assertEquals(5, pOrder.getFieldNumber(1).intValue()); + assertEquals(2, pOrder.getFieldNumber(2).intValue()); + assertEquals(Order.ASCENDING, pOrder.getOrder(0)); + assertEquals(Order.DESCENDING, pOrder.getOrder(1)); + assertEquals(Order.ASCENDING, pOrder.getOrder(2)); + assertEquals(IntValue.class, pOrder.getType(0)); + assertEquals(LongValue.class, pOrder.getType(1)); + assertEquals(StringValue.class, pOrder.getType(2)); + } + + @Test + public void testRangePartitioningPreserved2() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1->3; 2->0; 5->1"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + o.appendOrdering(2, StringValue.class, Order.ASCENDING); + GlobalProperties gprops = new GlobalProperties(); + gprops.setRangePartitioned(o); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertEquals(3, pFields.get(0).intValue()); + assertEquals(1, pFields.get(1).intValue()); + assertEquals(0, pFields.get(2).intValue()); + Ordering pOrder = result.getPartitioningOrdering(); + assertEquals(3, pOrder.getNumberOfFields()); + assertEquals(3, pOrder.getFieldNumber(0).intValue()); + assertEquals(1, pOrder.getFieldNumber(1).intValue()); + assertEquals(0, pOrder.getFieldNumber(2).intValue()); + assertEquals(Order.ASCENDING, pOrder.getOrder(0)); + assertEquals(Order.DESCENDING, pOrder.getOrder(1)); + assertEquals(Order.ASCENDING, pOrder.getOrder(2)); + assertEquals(IntValue.class, pOrder.getType(0)); + assertEquals(LongValue.class, pOrder.getType(1)); + assertEquals(StringValue.class, pOrder.getType(2)); + } + + @Test + public void testRangePartitioningErased() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;5"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + o.appendOrdering(2, StringValue.class, Order.ASCENDING); + GlobalProperties gprops = new GlobalProperties(); + gprops.setRangePartitioned(o); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertNull(result.getPartitioningOrdering()); + assertNull(result.getPartitioningFields()); + } + + @Test + public void testRebalancingPreserved() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setForcedRebalanced(); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.FORCED_REBALANCED, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + } + + @Test + public void testUniqueFieldGroupsPreserved1() { + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo); + + FieldSet set1 = new FieldSet(0, 1, 2); + FieldSet set2 = new FieldSet(3, 4); + FieldSet set3 = new FieldSet(4, 5, 6, 7); + GlobalProperties gprops = new GlobalProperties(); + gprops.addUniqueFieldCombination(set1); + gprops.addUniqueFieldCombination(set2); + gprops.addUniqueFieldCombination(set3); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + Set<FieldSet> unique = result.getUniqueFieldCombination(); + FieldSet expected1 = new FieldSet(0, 1, 2); + FieldSet expected2 = new FieldSet(3, 4); + + Assert.assertTrue(unique.size() == 2); + Assert.assertTrue(unique.contains(expected1)); + Assert.assertTrue(unique.contains(expected2)); + } + + @Test + public void testUniqueFieldGroupsPreserved2() { + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->5;1;2;3->6;4"}, null, null, tupleInfo, tupleInfo); + + FieldSet set1 = new FieldSet(0, 1, 2); + FieldSet set2 = new FieldSet(3, 4); + FieldSet set3 = new FieldSet(4, 5, 6, 7); + GlobalProperties gprops = new GlobalProperties(); + gprops.addUniqueFieldCombination(set1); + gprops.addUniqueFieldCombination(set2); + gprops.addUniqueFieldCombination(set3); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + Set<FieldSet> unique = result.getUniqueFieldCombination(); + FieldSet expected1 = new FieldSet(1, 2, 5); + FieldSet expected2 = new FieldSet(4, 6); + + Assert.assertTrue(unique.size() == 2); + Assert.assertTrue(unique.contains(expected1)); + Assert.assertTrue(unique.contains(expected2)); + } + + @Test + public void testUniqueFieldGroupsErased() { + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0; 3; 5; 6; 7"}, null, null, tupleInfo, tupleInfo); + + FieldSet set1 = new FieldSet(0, 1, 2); + FieldSet set2 = new FieldSet(3, 4); + FieldSet set3 = new FieldSet(4, 5, 6, 7); + GlobalProperties gprops = new GlobalProperties(); + gprops.addUniqueFieldCombination(set1); + gprops.addUniqueFieldCombination(set2); + gprops.addUniqueFieldCombination(set3); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + Assert.assertNull(result.getUniqueFieldCombination()); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testInvalidInputIndex() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1)); + + gprops.filterBySemanticProperties(sprops, 1); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java index fd4ad82..1890597 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.java.tuple.Tuple2; import org.junit.Test; public class GlobalPropertiesMatchingTest { @@ -113,7 +114,7 @@ public class GlobalPropertiesMatchingTest { @Test public void testMatchingCustomPartitioning() { try { - final Partitioner<Long> partitioner = new MockPartitioner(); + final Partitioner<Tuple2<Long, Integer>> partitioner = new MockPartitioner(); RequestedGlobalProperties req = new RequestedGlobalProperties(); req.setCustomPartitioned(new FieldSet(6, 2), partitioner); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesPushdownTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesPushdownTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesPushdownTest.java index f99ebb6..c1b84cf 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesPushdownTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesPushdownTest.java @@ -20,12 +20,11 @@ package org.apache.flink.compiler.dataproperties; import static org.junit.Assert.*; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.compiler.dag.OptimizerNode; import org.junit.Test; -import org.mockito.Matchers; public class GlobalPropertiesPushdownTest { @@ -35,11 +34,11 @@ public class GlobalPropertiesPushdownTest { RequestedGlobalProperties req = new RequestedGlobalProperties(); req.setAnyPartitioning(new FieldSet(3, 1)); - RequestedGlobalProperties preserved = req.filterByNodesConstantSet(getAllPreservingNode(), 0); + RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0); assertEquals(PartitioningProperty.ANY_PARTITIONING, preserved.getPartitioning()); assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3))); - RequestedGlobalProperties nonPreserved = req.filterByNodesConstantSet(getNonePreservingNode(), 0); + RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0); assertTrue(nonPreserved == null || nonPreserved.isTrivial()); } catch (Exception e) { @@ -54,11 +53,11 @@ public class GlobalPropertiesPushdownTest { RequestedGlobalProperties req = new RequestedGlobalProperties(); req.setHashPartitioned(new FieldSet(3, 1)); - RequestedGlobalProperties preserved = req.filterByNodesConstantSet(getAllPreservingNode(), 0); + RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0); assertEquals(PartitioningProperty.HASH_PARTITIONED, preserved.getPartitioning()); assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3))); - RequestedGlobalProperties nonPreserved = req.filterByNodesConstantSet(getNonePreservingNode(), 0); + RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0); assertTrue(nonPreserved == null || nonPreserved.isTrivial()); } catch (Exception e) { @@ -73,7 +72,7 @@ public class GlobalPropertiesPushdownTest { RequestedGlobalProperties req = new RequestedGlobalProperties(); req.setCustomPartitioned(new FieldSet(3, 1), new MockPartitioner()); - RequestedGlobalProperties pushedDown = req.filterByNodesConstantSet(getAllPreservingNode(), 0); + RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0); assertTrue(pushedDown == null || pushedDown.isTrivial()); } catch (Exception e) { @@ -88,7 +87,7 @@ public class GlobalPropertiesPushdownTest { RequestedGlobalProperties req = new RequestedGlobalProperties(); req.setForceRebalancing(); - RequestedGlobalProperties pushedDown = req.filterByNodesConstantSet(getAllPreservingNode(), 0); + RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0); assertTrue(pushedDown == null || pushedDown.isTrivial()); } catch (Exception e) { @@ -99,15 +98,11 @@ public class GlobalPropertiesPushdownTest { // -------------------------------------------------------------------------------------------- - private static OptimizerNode getAllPreservingNode() { - OptimizerNode node = mock(OptimizerNode.class); - when(node.isFieldConstant(Matchers.anyInt(), Matchers.anyInt())).thenReturn(true); - return node; + private static SemanticProperties getAllPreservingSemProps() { + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); } - private static OptimizerNode getNonePreservingNode() { - OptimizerNode node = mock(OptimizerNode.class); - when(node.isFieldConstant(Matchers.anyInt(), Matchers.anyInt())).thenReturn(false); - return node; + private static SemanticProperties getNonePreservingSemProps() { + return new SingleInputSemanticProperties(); } }
