http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java new file mode 100644 index 0000000..397dd3d --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.dataproperties; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Set; + +public class LocalPropertiesFilteringTest { + + private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo = + new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + @Test + public void testAllErased1() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + lProps = lProps.addUniqueFields(new FieldSet(5,6)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testAllErased2() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"5"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testGroupingPreserved1() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNotNull(filtered.getGroupedFields()); + assertEquals(3, filtered.getGroupedFields().size()); + assertTrue(filtered.getGroupedFields().contains(0)); + assertTrue(filtered.getGroupedFields().contains(2)); + assertTrue(filtered.getGroupedFields().contains(3)); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testGroupingPreserved2() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0;3->7"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNotNull(filtered.getGroupedFields()); + assertEquals(3, filtered.getGroupedFields().size()); + assertTrue(filtered.getGroupedFields().contains(4)); + assertTrue(filtered.getGroupedFields().contains(0)); + assertTrue(filtered.getGroupedFields().contains(7)); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testGroupingErased() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testSortingPreserved1() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;5"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(2, IntValue.class, Order.ASCENDING); + o.appendOrdering(0, StringValue.class, Order.DESCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + LocalProperties lProps = LocalProperties.forOrdering(o); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldList gFields = filtered.getGroupedFields(); + Ordering order = filtered.getOrdering(); + + assertNotNull(gFields); + assertEquals(3, gFields.size()); + assertTrue(gFields.contains(0)); + assertTrue(gFields.contains(2)); + assertTrue(gFields.contains(5)); + assertNotNull(order); + assertEquals(3, order.getNumberOfFields()); + assertEquals(2, order.getFieldNumber(0).intValue()); + assertEquals(0, order.getFieldNumber(1).intValue()); + assertEquals(5, order.getFieldNumber(2).intValue()); + assertEquals(Order.ASCENDING, order.getOrder(0)); + assertEquals(Order.DESCENDING, order.getOrder(1)); + assertEquals(Order.DESCENDING, order.getOrder(2)); + assertEquals(IntValue.class, order.getType(0)); + assertEquals(StringValue.class, order.getType(1)); + assertEquals(LongValue.class, order.getType(2)); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testSortingPreserved2() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->3;2->7;5->1"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(2, IntValue.class, Order.ASCENDING); + o.appendOrdering(0, StringValue.class, Order.DESCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + LocalProperties lProps = LocalProperties.forOrdering(o); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldList gFields = filtered.getGroupedFields(); + Ordering order = filtered.getOrdering(); + + assertNotNull(gFields); + assertEquals(3, gFields.size()); + assertTrue(gFields.contains(3)); + assertTrue(gFields.contains(7)); + assertTrue(gFields.contains(1)); + assertNotNull(order); + assertEquals(3, order.getNumberOfFields()); + assertEquals(7, order.getFieldNumber(0).intValue()); + assertEquals(3, order.getFieldNumber(1).intValue()); + assertEquals(1, order.getFieldNumber(2).intValue()); + assertEquals(Order.ASCENDING, order.getOrder(0)); + assertEquals(Order.DESCENDING, order.getOrder(1)); + assertEquals(Order.DESCENDING, order.getOrder(2)); + assertEquals(IntValue.class, order.getType(0)); + assertEquals(StringValue.class, order.getType(1)); + assertEquals(LongValue.class, order.getType(2)); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testSortingPreserved3() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(2, IntValue.class, Order.ASCENDING); + o.appendOrdering(0, StringValue.class, Order.DESCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + LocalProperties lProps = LocalProperties.forOrdering(o); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldList gFields = filtered.getGroupedFields(); + Ordering order = filtered.getOrdering(); + + assertNotNull(gFields); + assertEquals(2, gFields.size()); + assertTrue(gFields.contains(0)); + assertTrue(gFields.contains(2)); + assertNotNull(order); + assertEquals(2, order.getNumberOfFields()); + assertEquals(2, order.getFieldNumber(0).intValue()); + assertEquals(0, order.getFieldNumber(1).intValue()); + assertEquals(Order.ASCENDING, order.getOrder(0)); + assertEquals(Order.DESCENDING, order.getOrder(1)); + assertEquals(IntValue.class, order.getType(0)); + assertEquals(StringValue.class, order.getType(1)); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testSortingPreserved4() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"2->7;5"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(2, IntValue.class, Order.ASCENDING); + o.appendOrdering(0, StringValue.class, Order.DESCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + LocalProperties lProps = LocalProperties.forOrdering(o); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldList gFields = filtered.getGroupedFields(); + Ordering order = filtered.getOrdering(); + + assertNotNull(gFields); + assertEquals(1, gFields.size()); + assertTrue(gFields.contains(7)); + assertNotNull(order); + assertEquals(1, order.getNumberOfFields()); + assertEquals(7, order.getFieldNumber(0).intValue()); + assertEquals(Order.ASCENDING, order.getOrder(0)); + assertEquals(IntValue.class, order.getType(0)); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testSortingErased() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;5"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(2, IntValue.class, Order.ASCENDING); + o.appendOrdering(0, StringValue.class, Order.DESCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + LocalProperties lProps = LocalProperties.forOrdering(o); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldList gFields = filtered.getGroupedFields(); + Ordering order = filtered.getOrdering(); + + assertNull(gFields); + assertNull(order); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testUniqueFieldsPreserved1() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = new LocalProperties(); + lProps = lProps.addUniqueFields(new FieldSet(0,1,2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + lProps = lProps.addUniqueFields(new FieldSet(4,5,6)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldSet expected1 = new FieldSet(0,1,2); + FieldSet expected2 = new FieldSet(3,4); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNotNull(filtered.getUniqueFields()); + assertEquals(2, filtered.getUniqueFields().size()); + assertTrue(filtered.getUniqueFields().contains(expected1)); + assertTrue(filtered.getUniqueFields().contains(expected2)); + } + + @Test + public void testUniqueFieldsPreserved2() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(1,2)); + lProps = lProps.addUniqueFields(new FieldSet(0,1,2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + lProps = lProps.addUniqueFields(new FieldSet(4,5,6)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldSet expected1 = new FieldSet(0,1,2); + FieldSet expected2 = new FieldSet(3,4); + + assertNull(filtered.getOrdering()); + assertNotNull(filtered.getGroupedFields()); + assertEquals(2, filtered.getGroupedFields().size()); + assertTrue(filtered.getGroupedFields().contains(1)); + assertTrue(filtered.getGroupedFields().contains(2)); + assertNotNull(filtered.getUniqueFields()); + assertEquals(2, filtered.getUniqueFields().size()); + assertTrue(filtered.getUniqueFields().contains(expected1)); + assertTrue(filtered.getUniqueFields().contains(expected2)); + } + + @Test + public void testUniqueFieldsPreserved3() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->7;1->6;2->5;3->4;4->3"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = new LocalProperties(); + lProps = lProps.addUniqueFields(new FieldSet(0,1,2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + lProps = lProps.addUniqueFields(new FieldSet(4,5,6)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldSet expected1 = new FieldSet(5,6,7); + FieldSet expected2 = new FieldSet(3,4); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNotNull(filtered.getUniqueFields()); + assertEquals(2, filtered.getUniqueFields().size()); + assertTrue(filtered.getUniqueFields().contains(expected1)); + assertTrue(filtered.getUniqueFields().contains(expected2)); + } + + @Test + public void testUniqueFieldsErased() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = new LocalProperties(); + lProps = lProps.addUniqueFields(new FieldSet(0,1,2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + lProps = lProps.addUniqueFields(new FieldSet(4,5,6)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testInvalidInputIndex() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lprops = LocalProperties.forGrouping(new FieldList(0,1)); + + lprops.filterBySemanticProperties(sprops, 1); + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java new file mode 100644 index 0000000..2f438cf --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.dataproperties; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.Key; + +import java.io.IOException; + +public class MockDistribution implements DataDistribution { + + @Override + public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) { + return new Key<?>[0]; + } + + @Override + public int getNumberOfFields() { + return 0; + } + + @Override + public void write(DataOutputView out) throws IOException { + + } + + @Override + public void read(DataInputView in) throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java index 71e4c3a..1245398 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java @@ -19,13 +19,14 @@ package org.apache.flink.compiler.dataproperties; import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.tuple.Tuple2; -class MockPartitioner implements Partitioner<Long> { +class MockPartitioner implements Partitioner<Tuple2<Long, Integer>> { private static final long serialVersionUID = 1L; @Override - public int partition(Long key, int numPartitions) { + public int partition(Tuple2<Long, Integer> key, int numPartitions) { return 0; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java new file mode 100644 index 0000000..f9acabb --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.dataproperties; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class RequestedGlobalPropertiesFilteringTest { + + private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo = + new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + @Test(expected = NullPointerException.class) + public void testNullProps() { + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0,1,2)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(null, 0); + } + + @Test + public void testEraseAll1() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0,1,2)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testEraseAll2() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"3;4"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0, 1, 2)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testHashPartitioningPreserved1() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setHashPartitioned(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning()); + assertNotNull(filtered.getPartitionedFields()); + assertEquals(3, filtered.getPartitionedFields().size()); + assertTrue(filtered.getPartitionedFields().contains(0)); + assertTrue(filtered.getPartitionedFields().contains(3)); + assertTrue(filtered.getPartitionedFields().contains(4)); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + assertNull(filtered.getOrdering()); + } + + @Test + public void testHashPartitioningPreserved2() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setHashPartitioned(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning()); + assertNotNull(filtered.getPartitionedFields()); + assertEquals(3, filtered.getPartitionedFields().size()); + assertTrue(filtered.getPartitionedFields().contains(1)); + assertTrue(filtered.getPartitionedFields().contains(2)); + assertTrue(filtered.getPartitionedFields().contains(7)); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + assertNull(filtered.getOrdering()); + } + + @Test + public void testHashPartitioningErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setHashPartitioned(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testAnyPartitioningPreserved1() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning()); + assertNotNull(filtered.getPartitionedFields()); + assertEquals(3, filtered.getPartitionedFields().size()); + assertTrue(filtered.getPartitionedFields().contains(0)); + assertTrue(filtered.getPartitionedFields().contains(3)); + assertTrue(filtered.getPartitionedFields().contains(4)); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + assertNull(filtered.getOrdering()); + } + + @Test + public void testAnyPartitioningPreserved2() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning()); + assertNotNull(filtered.getPartitionedFields()); + assertEquals(3, filtered.getPartitionedFields().size()); + assertTrue(filtered.getPartitionedFields().contains(1)); + assertTrue(filtered.getPartitionedFields().contains(2)); + assertTrue(filtered.getPartitionedFields().contains(7)); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + assertNull(filtered.getOrdering()); + } + + @Test + public void testAnyPartitioningErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testRangePartitioningPreserved1() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;3;6"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(3, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setRangePartitioned(o); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning()); + assertNotNull(filtered.getOrdering()); + assertEquals(3, filtered.getOrdering().getNumberOfFields()); + assertEquals(3, filtered.getOrdering().getFieldNumber(0).intValue()); + assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue()); + assertEquals(6, filtered.getOrdering().getFieldNumber(2).intValue()); + assertEquals(LongValue.class, filtered.getOrdering().getType(0)); + assertEquals(IntValue.class, filtered.getOrdering().getType(1)); + assertEquals(ByteValue.class, filtered.getOrdering().getType(2)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0)); + assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2)); + assertNull(filtered.getPartitionedFields()); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + } + + @Test + public void testRangePartitioningPreserved2() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(3, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setRangePartitioned(o); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning()); + assertNotNull(filtered.getOrdering()); + assertEquals(3, filtered.getOrdering().getNumberOfFields()); + assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue()); + assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue()); + assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue()); + assertEquals(LongValue.class, filtered.getOrdering().getType(0)); + assertEquals(IntValue.class, filtered.getOrdering().getType(1)); + assertEquals(ByteValue.class, filtered.getOrdering().getType(2)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0)); + assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2)); + assertNull(filtered.getPartitionedFields()); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + } + + @Test + public void testRangePartitioningPreserved3() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo); + + DataDistribution dd = new MockDistribution(); + Ordering o = new Ordering(); + o.appendOrdering(3, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setRangePartitioned(o, dd); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning()); + assertNotNull(filtered.getOrdering()); + assertEquals(3, filtered.getOrdering().getNumberOfFields()); + assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue()); + assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue()); + assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue()); + assertEquals(LongValue.class, filtered.getOrdering().getType(0)); + assertEquals(IntValue.class, filtered.getOrdering().getType(1)); + assertEquals(ByteValue.class, filtered.getOrdering().getType(2)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0)); + assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2)); + assertNotNull(filtered.getDataDistribution()); + assertEquals(dd, filtered.getDataDistribution()); + assertNull(filtered.getPartitionedFields()); + assertNull(filtered.getCustomPartitioner()); + } + + @Test + public void testRangePartitioningErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(3, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setRangePartitioned(o); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testCustomPartitioningErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setCustomPartitioned(new FieldSet(0, 1, 2), new MockPartitioner()); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testRandomDistributionErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setRandomDistribution(); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testReplicationErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setFullyReplicated(); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testRebalancingErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setForceRebalancing(); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testDualHashPartitioningPreserved() { + + DualInputSemanticProperties dprops = new DualInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsDualFromString(dprops, new String[]{"0;2;4"}, new String[]{"1->3;4->6;3->7"}, + null, null, null, null, tupleInfo, tupleInfo, tupleInfo); + + RequestedGlobalProperties gprops1 = new RequestedGlobalProperties(); + RequestedGlobalProperties gprops2 = new RequestedGlobalProperties(); + gprops1.setHashPartitioned(new FieldSet(2, 0, 4)); + gprops2.setHashPartitioned(new FieldSet(3, 6, 7)); + RequestedGlobalProperties filtered1 = gprops1.filterBySemanticProperties(dprops, 0); + RequestedGlobalProperties filtered2 = gprops2.filterBySemanticProperties(dprops, 1); + + assertNotNull(filtered1); + assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered1.getPartitioning()); + assertNotNull(filtered1.getPartitionedFields()); + assertEquals(3, filtered1.getPartitionedFields().size()); + assertTrue(filtered1.getPartitionedFields().contains(0)); + assertTrue(filtered1.getPartitionedFields().contains(2)); + assertTrue(filtered1.getPartitionedFields().contains(4)); + assertNull(filtered1.getOrdering()); + assertNull(filtered1.getCustomPartitioner()); + assertNull(filtered1.getDataDistribution()); + + assertNotNull(filtered2); + assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered2.getPartitioning()); + assertNotNull(filtered2.getPartitionedFields()); + assertEquals(3, filtered2.getPartitionedFields().size()); + assertTrue(filtered2.getPartitionedFields().contains(1)); + assertTrue(filtered2.getPartitionedFields().contains(3)); + assertTrue(filtered2.getPartitionedFields().contains(4)); + assertNull(filtered2.getOrdering()); + assertNull(filtered2.getCustomPartitioner()); + assertNull(filtered2.getDataDistribution()); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testInvalidInputIndex() { + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties gprops = new RequestedGlobalProperties(); + gprops.setHashPartitioned(new FieldList(0,1)); + + gprops.filterBySemanticProperties(sprops, 1); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java new file mode 100644 index 0000000..f3aaf05 --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.dataproperties; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.junit.Assert; +import org.junit.Test; + +public class RequestedLocalPropertiesFilteringTest { + + private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo = + new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + @Test(expected = NullPointerException.class) + public void testNullProps() { + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(0, 2, 3)); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(null, 0); + } + + @Test + public void testAllErased() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(0, 2, 3)); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNull(filtered); + } + + @Test + public void testGroupingPreserved1() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(0, 2, 3)); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNotNull(filtered); + assertNotNull(filtered.getGroupedFields()); + assertEquals(3, filtered.getGroupedFields().size()); + assertTrue(filtered.getGroupedFields().contains(0)); + assertTrue(filtered.getGroupedFields().contains(2)); + assertTrue(filtered.getGroupedFields().contains(3)); + assertNull(filtered.getOrdering()); + } + + @Test + public void testGroupingPreserved2() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"3->0;5->2;1->3"}, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(0, 2, 3)); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNotNull(filtered); + assertNotNull(filtered.getGroupedFields()); + assertEquals(3, filtered.getGroupedFields().size()); + assertTrue(filtered.getGroupedFields().contains(3)); + assertTrue(filtered.getGroupedFields().contains(5)); + assertTrue(filtered.getGroupedFields().contains(1)); + assertNull(filtered.getOrdering()); + } + + @Test + public void testGroupingErased() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(0, 2, 3)); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNull(filtered); + } + + @Test + public void testOrderPreserved1() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1;4;6"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(4, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setOrdering(o); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNotNull(filtered); + assertNotNull(filtered.getOrdering()); + assertEquals(3, filtered.getOrdering().getNumberOfFields()); + assertEquals(4, filtered.getOrdering().getFieldNumber(0).intValue()); + assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue()); + assertEquals(6, filtered.getOrdering().getFieldNumber(2).intValue()); + assertEquals(LongValue.class, filtered.getOrdering().getType(0)); + assertEquals(IntValue.class, filtered.getOrdering().getType(1)); + assertEquals(ByteValue.class, filtered.getOrdering().getType(2)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0)); + assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2)); + assertNull(filtered.getGroupedFields()); + } + + @Test + public void testOrderPreserved2() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"5->1;0->4;2->6"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(4, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setOrdering(o); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNotNull(filtered); + assertNotNull(filtered.getOrdering()); + assertEquals(3, filtered.getOrdering().getNumberOfFields()); + assertEquals(0, filtered.getOrdering().getFieldNumber(0).intValue()); + assertEquals(5, filtered.getOrdering().getFieldNumber(1).intValue()); + assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue()); + assertEquals(LongValue.class, filtered.getOrdering().getType(0)); + assertEquals(IntValue.class, filtered.getOrdering().getType(1)); + assertEquals(ByteValue.class, filtered.getOrdering().getType(2)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0)); + assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2)); + assertNull(filtered.getGroupedFields()); + } + + @Test + public void testOrderErased() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1; 4"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(4, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setOrdering(o); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNull(filtered); + } + + @Test + public void testDualGroupingPreserved() { + + DualInputSemanticProperties dprops = new DualInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsDualFromString(dprops, new String[]{"1->0;3;2->4"}, new String[]{"0->7;1"}, + null, null, null, null, tupleInfo, tupleInfo, tupleInfo); + + RequestedLocalProperties lprops1 = new RequestedLocalProperties(); + lprops1.setGroupedFields(new FieldSet(0,3,4)); + + RequestedLocalProperties lprops2 = new RequestedLocalProperties(); + lprops2.setGroupedFields(new FieldSet(7, 1)); + + RequestedLocalProperties filtered1 = lprops1.filterBySemanticProperties(dprops, 0); + RequestedLocalProperties filtered2 = lprops2.filterBySemanticProperties(dprops, 1); + + assertNotNull(filtered1); + assertNotNull(filtered1.getGroupedFields()); + assertEquals(3, filtered1.getGroupedFields().size()); + assertTrue(filtered1.getGroupedFields().contains(1)); + assertTrue(filtered1.getGroupedFields().contains(2)); + assertTrue(filtered1.getGroupedFields().contains(3)); + assertNull(filtered1.getOrdering()); + + assertNotNull(filtered2); + assertNotNull(filtered2.getGroupedFields()); + assertEquals(2, filtered2.getGroupedFields().size()); + assertTrue(filtered2.getGroupedFields().contains(0)); + assertTrue(filtered2.getGroupedFields().contains(1)); + assertNull(filtered2.getOrdering()); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testInvalidInputIndex() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1; 4"}, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(1, 4)); + + rlProp.filterBySemanticProperties(sProps, 1); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java index 4d81e0b..3b9a7a2 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java @@ -278,19 +278,19 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { } }) .name(JOIN_WITH_SOLUTION_SET) - .withConstantSetSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null); + .withForwardedFieldsSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null); DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2) .reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() { public void reduce(Iterable<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {} }) .name(NEXT_WORKSET_REDUCER_NAME) - .withConstantSet("1->1","2->2","0->0"); + .withForwardedFields("1->1","2->2","0->0"); DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = mapBeforeSolutionDelta ? joinedWithSolutionSet.map(new RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } }) - .name(SOLUTION_DELTA_MAPPER_NAME).withConstantSet("0->0","1->1","2->2") : + .name(SOLUTION_DELTA_MAPPER_NAME).withForwardedFields("0->0","1->1","2->2") : joinedWithSolutionSet; iter.closeWith(nextSolutionSet, nextWorkset) http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java index 6325788..9cdea6d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java @@ -59,7 +59,7 @@ public abstract class DualInputOperator<IN1, IN2, OUT, FT extends Function> exte /** * Semantic properties of the associated function. */ - private DualInputSemanticProperties semanticProperties; + private DualInputSemanticProperties semanticProperties = new DualInputSemanticProperties(); // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java index fe53380..70fd753 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java @@ -27,20 +27,20 @@ import org.apache.flink.api.common.operators.util.FieldSet; /** * Container for the semantic properties associated to a dual input operator. */ -public class DualInputSemanticProperties extends SemanticProperties { +public class DualInputSemanticProperties implements SemanticProperties { private static final long serialVersionUID = 1L; - + /** * Mapping from fields in the source record(s) in the first input to fields * in the destination record(s). */ - private Map<Integer,FieldSet> forwardedFields1; + private Map<Integer,FieldSet> fieldMapping1; /** * Mapping from fields in the source record(s) in the second input to fields * in the destination record(s). */ - private Map<Integer,FieldSet> forwardedFields2; + private Map<Integer,FieldSet> fieldMapping2; /** * Set of fields that are read in the source record(s) from the @@ -56,293 +56,124 @@ public class DualInputSemanticProperties extends SemanticProperties { public DualInputSemanticProperties() { - init(); + this.fieldMapping1 = new HashMap<Integer,FieldSet>(); + this.fieldMapping2 = new HashMap<Integer,FieldSet>(); + this.readFields1 = null; + this.readFields2 = null; } - /** - * Finds the source field where the given field was forwarded from. - * @param dest The destination field in the output data. - * @return FieldSet containing the source input fields. - */ - public FieldSet forwardedFrom1(int dest) { - FieldSet fs = null; - for (Map.Entry<Integer, FieldSet> entry : forwardedFields1.entrySet()) { - if (entry.getValue().contains(dest)) { - if (fs == null) { - fs = new FieldSet(); - } + @Override + public FieldSet getForwardingTargetFields(int input, int sourceField) { - fs = fs.addField(entry.getKey()); - } + if (input != 0 && input != 1) { + throw new IndexOutOfBoundsException(); + } else if (input == 0) { + + return fieldMapping1.containsKey(sourceField) ? fieldMapping1.get(sourceField) : FieldSet.EMPTY_SET; + } else { + return fieldMapping2.containsKey(sourceField) ? fieldMapping2.get(sourceField) : FieldSet.EMPTY_SET; } - return fs; } - public FieldSet forwardedFrom2(int dest) { - FieldSet fs = null; - for (Map.Entry<Integer, FieldSet> entry : forwardedFields2.entrySet()) { - if (entry.getValue().contains(dest)) { - if (fs == null) { - fs = new FieldSet(); - } + @Override + public int getForwardingSourceField(int input, int targetField) { + Map<Integer, FieldSet> fieldMapping; + + if (input != 0 && input != 1) { + throw new IndexOutOfBoundsException(); + } else if (input == 0) { + fieldMapping = fieldMapping1; + } else { + fieldMapping = fieldMapping2; + } - fs = fs.addField(entry.getKey()); + for (Map.Entry<Integer, FieldSet> e : fieldMapping.entrySet()) { + if (e.getValue().contains(targetField)) { + return e.getKey(); } } - return fs; + return -1; } - /** - * Adds, to the existing information, a field that is forwarded directly - * from the source record(s) in the first input to the destination - * record(s). - * - * @param sourceField the position in the source record(s) from the first input - * @param destinationField the position in the destination record(s) - */ - public void addForwardedField1(int sourceField, int destinationField) { - FieldSet old = this.forwardedFields1.get(sourceField); - if (old == null) { - old = FieldSet.EMPTY_SET; + @Override + public FieldSet getReadFields(int input) { + if (input != 0 && input != 1) { + throw new IndexOutOfBoundsException(); } - - FieldSet fs = old.addField(destinationField); - this.forwardedFields1.put(sourceField, fs); - } - - /** - * Adds, to the existing information, a field that is forwarded directly - * from the source record(s) in the first input to multiple fields in - * the destination record(s). - * - * @param sourceField the position in the source record(s) - * @param destinationFields the position in the destination record(s) - */ - public void addForwardedField1(int sourceField, FieldSet destinationFields) { - FieldSet old = this.forwardedFields1.get(sourceField); - if (old == null) { - old = FieldSet.EMPTY_SET; + + if (input == 0) { + return readFields1; + } else { + return readFields2; } - - FieldSet fs = old.addFields(destinationFields); - this.forwardedFields1.put(sourceField, fs); } - - /** - * Sets a field that is forwarded directly from the source - * record(s) in the first input to multiple fields in the - * destination record(s). - * - * @param sourceField the position in the source record(s) - * @param destinationFields the position in the destination record(s) - */ - public void setForwardedField1(int sourceField, FieldSet destinationFields) { - this.forwardedFields1.put(sourceField, destinationFields); - } - - /** - * Gets the fields in the destination record where the source - * field from the first input is forwarded. - * - * @param sourceField the position in the source record - * @return the destination fields, or null if they do not exist - */ - public FieldSet getForwardedField1(int sourceField) { - if (isAllFieldsConstant()) { - return new FieldSet(sourceField); - } - return this.forwardedFields1.get(sourceField); - } - /** * Adds, to the existing information, a field that is forwarded directly - * from the source record(s) in the second input to the destination + * from the source record(s) in the first input to the destination * record(s). - * - * @param sourceField the position in the source record(s) from the first input - * @param destinationField the position in the destination record(s) - */ - public void addForwardedField2(int sourceField, int destinationField) { - FieldSet old = this.forwardedFields2.get(sourceField); - if (old == null) { - old = FieldSet.EMPTY_SET; - } - - FieldSet fs = old.addField(destinationField); - this.forwardedFields2.put(sourceField, fs); - } - - /** - * Adds, to the existing information, a field that is forwarded directly - * from the source record(s) in the second input to multiple fields in - * the destination record(s). - * - * @param sourceField the position in the source record(s) - * @param destinationFields the position in the destination record(s) - */ - public void addForwardedField2(int sourceField, FieldSet destinationFields) { - FieldSet old = this.forwardedFields2.get(sourceField); - if (old == null) { - old = FieldSet.EMPTY_SET; - } - - FieldSet fs = old.addFields(destinationFields); - this.forwardedFields2.put(sourceField, fs); - } - - /** - * Sets a field that is forwarded directly from the source - * record(s) in the second input to multiple fields in the - * destination record(s). - * - * @param sourceField the position in the source record(s) - * @param destinationFields the position in the destination record(s) - */ - public void setForwardedField2(int sourceField, FieldSet destinationFields) { - this.forwardedFields2.put(sourceField, destinationFields); - } - - /** - * Gets the fields in the destination record where the source - * field from the second input is forwarded. - * + * + * @param input the input of the source field * @param sourceField the position in the source record - * @return the destination fields, or null if they do not exist + * @param targetField the position in the destination record */ - public FieldSet getForwardedField2(int sourceField) { - if (isAllFieldsConstant()) { - return new FieldSet(sourceField); - } + public void addForwardedField(int input, int sourceField, int targetField) { - return this.forwardedFields2.get(sourceField); - } + Map<Integer, FieldSet> fieldMapping; - @Override - public FieldSet getSourceField(int input, int field) { - if (isAllFieldsConstant()) { - return new FieldSet(field); + if (input != 0 && input != 1) { + throw new IndexOutOfBoundsException(); + } else if (input == 0) { + fieldMapping = this.fieldMapping1; + } else { + fieldMapping = this.fieldMapping2; } - switch(input) { - case 0: - return this.forwardedFrom1(field); - case 1: - return this.forwardedFrom2(field); - default: - throw new IndexOutOfBoundsException(); + if(isTargetFieldPresent(targetField, fieldMapping)) { + throw new InvalidSemanticAnnotationException("Target field "+targetField+" was added twice to input "+input); } - } - @Override - public FieldSet getForwardFields(int input, int field) { - if (isAllFieldsConstant()) { - return new FieldSet(field); + FieldSet targetFields = fieldMapping.get(sourceField); + if (targetFields != null) { + fieldMapping.put(sourceField, targetFields.addField(targetField)); + } else { + fieldMapping.put(sourceField, new FieldSet(targetField)); } + } - if (input == 0) { - return this.getForwardedField1(field); - } else if (input == 1) { - return this.getForwardedField2(field); + private boolean isTargetFieldPresent(int targetField, Map<Integer, FieldSet> fieldMapping) { + + for(FieldSet targetFields : fieldMapping.values()) { + if(targetFields.contains(targetField)) { + return true; + } } - return null; + return false; } /** * Adds, to the existing information, field(s) that are read in * the source record(s) from the first input. - * - * @param readFields the position(s) in the source record(s) - */ - public void addReadFields1(FieldSet readFields) { - if (this.readFields1 == null) { - this.readFields1 = readFields; - } else { - this.readFields1 = this.readFields2.addFields(readFields); - } - } - - /** - * Sets the field(s) that are read in the source record(s) from the first - * input. - * - * @param readFields the position(s) in the source record(s) - */ - public void setReadFields1(FieldSet readFields) { - this.readFields1 = readFields; - } - - /** - * Gets the field(s) in the source record(s) from the first input - * that are read. - * - * @return the field(s) in the record, or null if they are not set - */ - public FieldSet getReadFields1() { - return this.readFields1; - } - - /** - * Adds, to the existing information, field(s) that are read in - * the source record(s) from the second input. - * + * + * @param input the input of the read fields * @param readFields the position(s) in the source record(s) */ - public void addReadFields2(FieldSet readFields) { - if (this.readFields2 == null) { - this.readFields2 = readFields; + public void addReadFields(int input, FieldSet readFields) { + + FieldSet curReadFields; + + if (input != 0 && input != 1) { + throw new IndexOutOfBoundsException(); + } else if (input == 0) { + this.readFields1 = (this.readFields1 == null) ? readFields.clone() : this.readFields1.addFields(readFields); } else { - this.readFields2 = this.readFields2.addFields(readFields); + this.readFields2 = (this.readFields2 == null) ? readFields.clone() : this.readFields2.addFields(readFields); } } - - /** - * Sets the field(s) that are read in the source record(s) from the second - * input. - * - * @param readFields the position(s) in the source record(s) - */ - public void setReadFields2(FieldSet readFields) { - this.readFields2 = readFields; - } - - /** - * Gets the field(s) in the source record(s) from the second input - * that are read. - * - * @return the field(s) in the record, or null if they are not set - */ - public FieldSet getReadFields2() { - return this.readFields2; - } - - /** - * Clears the object. - */ - @Override - public void clearProperties() { - super.clearProperties(); - init(); - } @Override - public boolean isEmpty() { - return super.isEmpty() && - (forwardedFields1 == null || forwardedFields1.isEmpty()) && - (forwardedFields2 == null || forwardedFields2.isEmpty()) && - (readFields1 == null || readFields1.size() == 0) && - (readFields2 == null || readFields2.size() == 0); - } - - @Override public String toString() { - return "DISP(" + this.forwardedFields1 + "; " + this.forwardedFields2 + ")"; + return "DISP(" + this.fieldMapping1 + "; " + this.fieldMapping2 + ")"; } - private void init() { - this.forwardedFields1 = new HashMap<Integer,FieldSet>(); - this.forwardedFields2 = new HashMap<Integer,FieldSet>(); - this.readFields1 = null; - this.readFields2 = null; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java index 7b90c8e..e99cac7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java @@ -144,7 +144,7 @@ public class Ordering { } for (int i = 0; i < this.indexes.size(); i++) { - if (this.indexes.get(i).intValue() != otherOrdering.indexes.get(i).intValue()) { + if (this.indexes.get(i) != otherOrdering.indexes.get(i)) { return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java index da99018..5afee79 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java @@ -21,72 +21,73 @@ package org.apache.flink.api.common.operators; import java.io.Serializable; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.operators.util.FieldSet; /** * Container for the semantic properties associated to an operator. */ -public abstract class SemanticProperties implements Serializable { - private boolean allFieldsConstant; +public interface SemanticProperties extends Serializable { - private static final long serialVersionUID = 1L; + /** + * Returns the indexes of all target fields to which a source field has been + * unmodified copied by a function. + * + * @param input The input id for the requested source field (0 for first input, 1 for second input) + * @param sourceField The index of the field for which the target position index is requested. + * @return A set containing the indexes of all target fields to which the source field has been unmodified copied. + * + */ + public FieldSet getForwardingTargetFields(int input, int sourceField); + + /** + * Returns the index of the source field on the given input from which the target field + * has been unmodified copied by a function. + * + * @param input The input id for the requested source field (0 for first input, 1 for second input) + * @param targetField The index of the target field to which the source field has been copied. + * @return The index of the source field on the given index that was copied to the given target field. + * -1 if the target field was not copied from any source field of the given input. + */ + public int getForwardingSourceField(int input, int targetField); - /** Set of fields that are written in the destination record(s).*/ - private FieldSet writtenFields; - - /** - * Adds, to the existing information, field(s) that are written in - * the destination record(s). - * - * @param writtenFields the position(s) in the destination record(s) + * Returns the position indexes of all fields of an input that are accessed by a function. + * + * @param input The input id for which accessed fields are requested. + * @return A set of fields of the specified input which have been accessed by the function. Null if no information is available. */ - public void addWrittenFields(FieldSet writtenFields) { - if(this.writtenFields == null) { - this.writtenFields = writtenFields; - } else { - this.writtenFields = this.writtenFields.addFields(writtenFields); + public FieldSet getReadFields(int input); + + // ---------------------------------------------------------------------- + + public static class InvalidSemanticAnnotationException extends InvalidProgramException { + + public InvalidSemanticAnnotationException(String s) { + super(s); } - } - public void setAllFieldsConstant(boolean constant) { - this.allFieldsConstant = constant; + public InvalidSemanticAnnotationException(String s, Throwable e) { + super(s,e); + } } - public boolean isAllFieldsConstant() { - return this.allFieldsConstant; - } + public static class EmptySemanticProperties implements SemanticProperties { + + @Override + public FieldSet getForwardingTargetFields(int input, int sourceField) { + return FieldSet.EMPTY_SET; + } - public abstract FieldSet getForwardFields(int input, int field); + @Override + public int getForwardingSourceField(int input, int targetField) { + return -1; + } - public abstract FieldSet getSourceField(int input, int field); + @Override + public FieldSet getReadFields(int input) { + return null; + } - /** - * Sets the field(s) that are written in the destination record(s). - * - * @param writtenFields the position(s) in the destination record(s) - */ - public void setWrittenFields(FieldSet writtenFields) { - this.writtenFields = writtenFields; - } - - /** - * Gets the field(s) in the destination record(s) that are written. - * - * @return the field(s) in the record, or null if they are not set - */ - public FieldSet getWrittenFields() { - return this.writtenFields; - } - - /** - * Clears the object. - */ - public void clearProperties() { - this.writtenFields = null; - } - - public boolean isEmpty() { - return this.writtenFields == null || this.writtenFields.size() == 0; } } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java index 5b491bf..eddf89b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java @@ -42,7 +42,7 @@ public abstract class SingleInputOperator<IN, OUT, FT extends Function> extends private final int[] keyFields; /** Semantic properties of the associated function. */ - private SingleInputSemanticProperties semanticProperties; + private SingleInputSemanticProperties semanticProperties = new SingleInputSemanticProperties(); // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java index abe995b..23bbc8b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java @@ -26,120 +26,90 @@ import org.apache.flink.api.common.operators.util.FieldSet; /** * Container for the semantic properties associated to a single input operator. */ -public class SingleInputSemanticProperties extends SemanticProperties { - +public class SingleInputSemanticProperties implements SemanticProperties { private static final long serialVersionUID = 1L; - - /**Mapping from fields in the source record(s) to fields in the destination record(s). */ - private Map<Integer,FieldSet> forwardedFields; - - /** Set of fields that are read in the source record(s).*/ + + /** + * Mapping from fields in the source record(s) to fields in the destination + * record(s). + */ + private Map<Integer,FieldSet> fieldMapping; + + /** + * Set of fields that are read in the source record(s). + */ private FieldSet readFields; + public SingleInputSemanticProperties() { + this.fieldMapping = new HashMap<Integer, FieldSet>(); + this.readFields = null; + } + @Override - public FieldSet getForwardFields(int input, int field) { + public FieldSet getForwardingTargetFields(int input, int sourceField) { if (input != 0) { throw new IndexOutOfBoundsException(); } - return this.getForwardedField(field); + + return this.fieldMapping.containsKey(sourceField) ? this.fieldMapping.get(sourceField) : FieldSet.EMPTY_SET; } @Override - public FieldSet getSourceField(int input, int field) { + public int getForwardingSourceField(int input, int targetField) { if (input != 0) { throw new IndexOutOfBoundsException(); } - if (isAllFieldsConstant()) { - return new FieldSet(field); + for (Map.Entry<Integer, FieldSet> e : fieldMapping.entrySet()) { + if (e.getValue().contains(targetField)) { + return e.getKey(); + } } - - return this.forwardedFrom(field); + return -1; } - public FieldSet forwardedFrom(int dest) { - FieldSet fs = null; - for (Map.Entry<Integer, FieldSet> entry : forwardedFields.entrySet()) { - if (entry.getValue().contains(dest)) { - if (fs == null) { - fs = new FieldSet(); - } - - fs = fs.addField(entry.getKey()); - } + @Override + public FieldSet getReadFields(int input) { + if (input != 0) { + throw new IndexOutOfBoundsException(); } - return fs; - } - public SingleInputSemanticProperties() { - init(); + return this.readFields; } - + /** * Adds, to the existing information, a field that is forwarded directly * from the source record(s) to the destination record(s). - * + * * @param sourceField the position in the source record(s) - * @param destinationField the position in the destination record(s) + * @param targetField the position in the destination record(s) */ - public void addForwardedField(int sourceField, int destinationField) { - FieldSet old = this.forwardedFields.get(sourceField); - if (old == null) { - old = FieldSet.EMPTY_SET; + public void addForwardedField(int sourceField, int targetField) { + if(isTargetFieldPresent(targetField)) { + throw new InvalidSemanticAnnotationException("Target field "+targetField+" was added twice."); } - - FieldSet fs = old.addField(destinationField); - this.forwardedFields.put(sourceField, fs); - } - - /** - * Adds, to the existing information, a field that is forwarded directly - * from the source record(s) to multiple fields in the destination - * record(s). - * - * @param sourceField the position in the source record(s) - * @param destinationFields the position in the destination record(s) - */ - public void addForwardedField(int sourceField, FieldSet destinationFields) { - FieldSet old = this.forwardedFields.get(sourceField); - if (old == null) { - old = FieldSet.EMPTY_SET; + + FieldSet targetFields = fieldMapping.get(sourceField); + if (targetFields != null) { + fieldMapping.put(sourceField, targetFields.addField(targetField)); + } else { + fieldMapping.put(sourceField, new FieldSet(targetField)); } - - FieldSet fs = old.addFields(destinationFields); - this.forwardedFields.put(sourceField, fs); } - - /** - * Sets a field that is forwarded directly from the source - * record(s) to multiple fields in the destination record(s). - * - * @param sourceField the position in the source record(s) - * @param destinationFields the position in the destination record(s) - */ - public void setForwardedField(int sourceField, FieldSet destinationFields) { - this.forwardedFields.put(sourceField,destinationFields); - } - - /** - * Gets the fields in the destination record where the source - * field is forwarded. - * - * @param sourceField the position in the source record - * @return the destination fields, or null if they do not exist - */ - public FieldSet getForwardedField(int sourceField) { - if (isAllFieldsConstant()) { - return new FieldSet(sourceField); - } - return this.forwardedFields.get(sourceField); + private boolean isTargetFieldPresent(int targetField) { + for(FieldSet targetFields : fieldMapping.values()) { + if(targetFields.contains(targetField)) { + return true; + } + } + return false; } - + /** * Adds, to the existing information, field(s) that are read in * the source record(s). - * + * * @param readFields the position(s) in the source record(s) */ public void addReadFields(FieldSet readFields) { @@ -149,112 +119,39 @@ public class SingleInputSemanticProperties extends SemanticProperties { this.readFields = this.readFields.addFields(readFields); } } - - /** - * Sets the field(s) that are read in the source record(s). - * - * @param readFields the position(s) in the source record(s) - */ - public void setReadFields(FieldSet readFields) { - this.readFields = readFields; - } - - /** - * Gets the field(s) in the source record(s) that are read. - * - * @return the field(s) in the record, or null if they are not set - */ - public FieldSet getReadFields() { - return this.readFields; - } - - /** - * Clears the object. - */ - @Override - public void clearProperties() { - super.clearProperties(); - init(); - } - - @Override - public boolean isEmpty() { - return super.isEmpty() && - (forwardedFields == null || forwardedFields.isEmpty()) && - (readFields == null || readFields.size() == 0); - } @Override public String toString() { - return "SISP(" + this.forwardedFields + ")"; + return "SISP(" + this.fieldMapping + ")"; } - private void init() { - this.forwardedFields = new HashMap<Integer,FieldSet>(); - this.readFields = null; - } - + // -------------------------------------------------------------------------------------------- - - public static class AllFieldsConstantProperties extends SingleInputSemanticProperties { - - private static final long serialVersionUID = 1L; - @Override - public FieldSet getReadFields() { - return FieldSet.EMPTY_SET; - } - - @Override - public FieldSet getWrittenFields() { - return FieldSet.EMPTY_SET; - } + public static class AllFieldsForwardedProperties extends SingleInputSemanticProperties { + + private static final long serialVersionUID = 1L; @Override - public FieldSet getForwardedField(int sourceField) { + public FieldSet getForwardingTargetFields(int input, int sourceField) { + if(input != 0) { + throw new IndexOutOfBoundsException(); + } return new FieldSet(sourceField); } - - // ----- all mutating operations are unsupported ----- - - @Override - public void addForwardedField(int sourceField, FieldSet destinationFields) { - throw new UnsupportedOperationException(); - } - - @Override - public void addForwardedField(int sourceField, int destinationField) { - throw new UnsupportedOperationException(); - } - - @Override - public void setForwardedField(int sourceField, FieldSet destinationFields) { - throw new UnsupportedOperationException(); - } - - @Override - public void addReadFields(FieldSet readFields) { - throw new UnsupportedOperationException(); - } - - @Override - public void setReadFields(FieldSet readFields) { - throw new UnsupportedOperationException(); - } @Override - public void addWrittenFields(FieldSet writtenFields) { - throw new UnsupportedOperationException(); + public int getForwardingSourceField(int input, int targetField) { + if(input != 0) { + throw new IndexOutOfBoundsException(); + } + return targetField; } @Override - public void setWrittenFields(FieldSet writtenFields) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isEmpty() { - return false; + public void addForwardedField(int sourceField, int targetField) { + throw new UnsupportedOperationException("Cannot modify forwarded fields"); } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java index 6735298..3602a82 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java @@ -82,7 +82,7 @@ public class PartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpF @Override public SingleInputSemanticProperties getSemanticProperties() { - return new SingleInputSemanticProperties.AllFieldsConstantProperties(); + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java index 0a2f1b0..c3ea0e4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.typeutils; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.typeinfo.AtomicType; @@ -38,10 +39,41 @@ public abstract class CompositeType<T> extends TypeInformation<T> { } /** - * Returns the keyPosition for the given fieldPosition, offsetted by the given offset + * Returns the flat field descriptors for the given field expression. + * + * @param fieldExpression The field expression for which the flat field descriptors are computed. + * @return The list of descriptors for the flat fields which are specified by the field expression. + */ + public List<FlatFieldDescriptor> getFlatFields(String fieldExpression) { + List<FlatFieldDescriptor> result = new ArrayList<FlatFieldDescriptor>(); + this.getFlatFields(fieldExpression, 0, result); + return result; + } + + /** + * Computes the flat field descriptors for the given field expression with the given offset. + * + * @param fieldExpression The field expression for which the FlatFieldDescriptors are computed. + * @param offset The offset to use when computing the positions of the flat fields. + * @param result The list into which all flat field descriptors are inserted. + */ + public abstract void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result); + + /** + * Returns the type of the (nested) field at the given field expression position. + * Wildcards are not allowed. + * + * @param fieldExpression The field expression for which the field of which the type is returned. + * @return The type of the field at the given field expression. + */ + public abstract <X> TypeInformation<X> getTypeAt(String fieldExpression); + + /** + * Returns the type of the (unnested) field at the given field position. + * + * @param pos The position of the (unnested) field in this composite type. + * @return The type of the field at the given position. */ - public abstract void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result); - public abstract <X> TypeInformation<X> getTypeAt(int pos); /** @@ -105,8 +137,8 @@ public abstract class CompositeType<T> extends TypeInformation<T> { private TypeInformation<?> type; public FlatFieldDescriptor(int keyPosition, TypeInformation<?> type) { - if( !(type instanceof AtomicType)) { - throw new IllegalArgumentException("A flattened field can only be an atomic type"); + if(type instanceof CompositeType) { + throw new IllegalArgumentException("A flattened field can not be a composite type"); } this.keyPosition = keyPosition; this.type = type; @@ -126,4 +158,11 @@ public abstract class CompositeType<T> extends TypeInformation<T> { return "FlatFieldDescriptor [position="+keyPosition+" typeInfo="+type+"]"; } } + + public static class InvalidFieldReferenceException extends IllegalArgumentException { + + public InvalidFieldReferenceException(String s) { + super(s); + } + } }
