http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java new file mode 100644 index 0000000..7ea7930 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import org.apache.flink.api.common.operators.util.FieldSet; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class DualInputSemanticPropertiesTest { + + @Test + public void testGetTargetFields() { + + // first input + DualInputSemanticProperties sp = new DualInputSemanticProperties(); + sp.addForwardedField(0, 0,1); + sp.addForwardedField(0, 1,4); + sp.addForwardedField(0, 2,3); + sp.addForwardedField(0, 3,2); + + assertEquals(1, sp.getForwardingTargetFields(0, 0).size()); + assertEquals(1, sp.getForwardingTargetFields(0, 1).size()); + assertEquals(1, sp.getForwardingTargetFields(0, 2).size()); + assertEquals(1, sp.getForwardingTargetFields(0, 3).size()); + assertTrue(sp.getForwardingTargetFields(0, 0).contains(1)); + assertTrue(sp.getForwardingTargetFields(0, 1).contains(4)); + assertTrue(sp.getForwardingTargetFields(0, 2).contains(3)); + assertTrue(sp.getForwardingTargetFields(0, 3).contains(2)); + assertNotNull(sp.getForwardingTargetFields(0, 4)); + assertEquals(0, sp.getForwardingTargetFields(0, 4).size()); + + sp = new DualInputSemanticProperties(); + sp.addForwardedField(0, 0,0); + sp.addForwardedField(0, 0,4); + sp.addForwardedField(0, 1,1); + sp.addForwardedField(0, 1,2); + sp.addForwardedField(0, 1,3); + + assertEquals(2, sp.getForwardingTargetFields(0, 0).size()); + assertEquals(3, sp.getForwardingTargetFields(0, 1).size()); + assertTrue(sp.getForwardingTargetFields(0, 0).contains(0)); + assertTrue(sp.getForwardingTargetFields(0, 0).contains(4)); + assertTrue(sp.getForwardingTargetFields(0, 1).contains(1)); + assertTrue(sp.getForwardingTargetFields(0, 1).contains(2)); + assertTrue(sp.getForwardingTargetFields(0, 1).contains(3)); + assertNotNull(sp.getForwardingTargetFields(0, 2)); + assertEquals(0, sp.getForwardingTargetFields(0, 2).size()); + + // second input + sp = new DualInputSemanticProperties(); + sp.addForwardedField(1, 0,1); + sp.addForwardedField(1, 1,4); + sp.addForwardedField(1, 2,3); + sp.addForwardedField(1, 3,2); + + assertEquals(1, sp.getForwardingTargetFields(1, 0).size()); + assertEquals(1, sp.getForwardingTargetFields(1, 1).size()); + assertEquals(1, sp.getForwardingTargetFields(1, 2).size()); + assertEquals(1, sp.getForwardingTargetFields(1, 3).size()); + assertTrue(sp.getForwardingTargetFields(1, 0).contains(1)); + assertTrue(sp.getForwardingTargetFields(1, 1).contains(4)); + assertTrue(sp.getForwardingTargetFields(1, 2).contains(3)); + assertTrue(sp.getForwardingTargetFields(1, 3).contains(2)); + assertNotNull(sp.getForwardingTargetFields(1, 4)); + assertEquals(0, sp.getForwardingTargetFields(1, 4).size()); + + sp = new DualInputSemanticProperties(); + sp.addForwardedField(1, 0,0); + sp.addForwardedField(1, 0,4); + sp.addForwardedField(1, 1,1); + sp.addForwardedField(1, 1,2); + sp.addForwardedField(1, 1,3); + + assertEquals(2, sp.getForwardingTargetFields(1, 0).size()); + assertEquals(3, sp.getForwardingTargetFields(1, 1).size()); + assertTrue(sp.getForwardingTargetFields(1, 0).contains(0)); + assertTrue(sp.getForwardingTargetFields(1, 0).contains(4)); + assertTrue(sp.getForwardingTargetFields(1, 1).contains(1)); + assertTrue(sp.getForwardingTargetFields(1, 1).contains(2)); + assertTrue(sp.getForwardingTargetFields(1, 1).contains(3)); + assertNotNull(sp.getForwardingTargetFields(1, 2)); + assertEquals(0, sp.getForwardingTargetFields(1, 2).size()); + + // both inputs + sp = new DualInputSemanticProperties(); + sp.addForwardedField(0, 2,6); + sp.addForwardedField(0, 7,8); + sp.addForwardedField(1, 0,1); + sp.addForwardedField(1, 1,4); + + assertEquals(1, sp.getForwardingTargetFields(0, 2).size()); + assertEquals(1, sp.getForwardingTargetFields(0, 7).size()); + assertEquals(1, sp.getForwardingTargetFields(1, 0).size()); + assertEquals(1, sp.getForwardingTargetFields(1, 1).size()); + assertTrue(sp.getForwardingTargetFields(0, 2).contains(6)); + assertTrue(sp.getForwardingTargetFields(0, 7).contains(8)); + assertTrue(sp.getForwardingTargetFields(1, 0).contains(1)); + assertTrue(sp.getForwardingTargetFields(1, 1).contains(4)); + assertNotNull(sp.getForwardingTargetFields(0, 1)); + assertNotNull(sp.getForwardingTargetFields(1, 4)); + assertEquals(0, sp.getForwardingTargetFields(0, 1).size()); + assertEquals(0, sp.getForwardingTargetFields(1, 4).size()); + + sp = new DualInputSemanticProperties(); + sp.addForwardedField(0, 0,0); + sp.addForwardedField(0, 0,4); + sp.addForwardedField(0, 3,8); + sp.addForwardedField(1, 1,1); + sp.addForwardedField(1, 1,2); + sp.addForwardedField(1, 4,8); + + assertEquals(2, sp.getForwardingTargetFields(0, 0).size()); + assertEquals(1, sp.getForwardingTargetFields(0, 3).size()); + assertEquals(2, sp.getForwardingTargetFields(1, 1).size()); + assertEquals(1, sp.getForwardingTargetFields(1, 4).size()); + assertTrue(sp.getForwardingTargetFields(0, 0).contains(0)); + assertTrue(sp.getForwardingTargetFields(0, 0).contains(4)); + assertTrue(sp.getForwardingTargetFields(0, 3).contains(8)); + assertTrue(sp.getForwardingTargetFields(1, 1).contains(1)); + assertTrue(sp.getForwardingTargetFields(1, 1).contains(1)); + assertTrue(sp.getForwardingTargetFields(1, 4).contains(8)); + + } + + @Test + public void testGetSourceField() { + + // first input + DualInputSemanticProperties sp = new DualInputSemanticProperties(); + sp.addForwardedField(0, 0,1); + sp.addForwardedField(0, 1,4); + sp.addForwardedField(0, 2,3); + sp.addForwardedField(0, 3,2); + + assertEquals(0, sp.getForwardingSourceField(0, 1)); + assertEquals(1, sp.getForwardingSourceField(0, 4)); + assertEquals(2, sp.getForwardingSourceField(0, 3)); + assertEquals(3, sp.getForwardingSourceField(0, 2)); + assertTrue(sp.getForwardingSourceField(0, 0) < 0); + assertTrue(sp.getForwardingSourceField(0, 5) < 0); + + sp = new DualInputSemanticProperties(); + sp.addForwardedField(0, 0,0); + sp.addForwardedField(0, 0,4); + sp.addForwardedField(0, 1,1); + sp.addForwardedField(0, 1,2); + sp.addForwardedField(0, 1,3); + + assertEquals(0, sp.getForwardingSourceField(0, 0)); + assertEquals(0, sp.getForwardingSourceField(0, 4)); + assertEquals(1, sp.getForwardingSourceField(0, 1)); + assertEquals(1, sp.getForwardingSourceField(0, 2)); + assertEquals(1, sp.getForwardingSourceField(0, 3)); + assertTrue(sp.getForwardingSourceField(0, 5) < 0); + + // second input + sp = new DualInputSemanticProperties(); + sp.addForwardedField(1, 0,1); + sp.addForwardedField(1, 1,4); + sp.addForwardedField(1, 2,3); + sp.addForwardedField(1, 3,2); + + assertEquals(0, sp.getForwardingSourceField(1, 1)); + assertEquals(1, sp.getForwardingSourceField(1, 4)); + assertEquals(2, sp.getForwardingSourceField(1, 3)); + assertEquals(3, sp.getForwardingSourceField(1, 2)); + assertTrue(sp.getForwardingSourceField(1, 0) < 0); + assertTrue(sp.getForwardingSourceField(1, 5) < 0); + + sp = new DualInputSemanticProperties(); + sp.addForwardedField(1, 0,0); + sp.addForwardedField(1, 0,4); + sp.addForwardedField(1, 1,1); + sp.addForwardedField(1, 1,2); + sp.addForwardedField(1, 1,3); + + assertEquals(0, sp.getForwardingSourceField(1, 0)); + assertEquals(0, sp.getForwardingSourceField(1, 4)); + assertEquals(1, sp.getForwardingSourceField(1, 1)); + assertEquals(1, sp.getForwardingSourceField(1, 2)); + assertEquals(1, sp.getForwardingSourceField(1, 3)); + assertTrue(sp.getForwardingSourceField(1, 5) < 0); + } + + @Test + public void testGetReadSet() { + + // first input + DualInputSemanticProperties sp = new DualInputSemanticProperties(); + sp.addReadFields(0, new FieldSet(0, 1)); + + assertEquals(2, sp.getReadFields(0).size()); + assertTrue(sp.getReadFields(0).contains(0)); + assertTrue(sp.getReadFields(0).contains(1)); + + sp.addReadFields(0, new FieldSet(3)); + + assertEquals(3, sp.getReadFields(0).size()); + assertTrue(sp.getReadFields(0).contains(0)); + assertTrue(sp.getReadFields(0).contains(1)); + assertTrue(sp.getReadFields(0).contains(3)); + + // second input + sp = new DualInputSemanticProperties(); + sp.addReadFields(1, new FieldSet(0, 1)); + + assertEquals(2, sp.getReadFields(1).size()); + assertTrue(sp.getReadFields(1).contains(0)); + assertTrue(sp.getReadFields(1).contains(1)); + + sp.addReadFields(1, new FieldSet(3)); + + assertEquals(3, sp.getReadFields(1).size()); + assertTrue(sp.getReadFields(1).contains(0)); + assertTrue(sp.getReadFields(1).contains(1)); + assertTrue(sp.getReadFields(1).contains(3)); + } + + @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class) + public void testAddForwardedFieldsTargetTwice1() { + + DualInputSemanticProperties sp = new DualInputSemanticProperties(); + sp.addForwardedField(0, 0, 2); + sp.addForwardedField(0, 1, 2); + } + + @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class) + public void testAddForwardedFieldsTargetTwice2() { + + DualInputSemanticProperties sp = new DualInputSemanticProperties(); + sp.addForwardedField(1, 0, 2); + sp.addForwardedField(1, 1, 2); + } + + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java new file mode 100644 index 0000000..5348653 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.junit.Test; + +public class SingleInputSemanticPropertiesTest { + + @Test + public void testGetTargetFields() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + sp.addForwardedField(0,1); + sp.addForwardedField(1,4); + sp.addForwardedField(2,3); + sp.addForwardedField(3,2); + + assertEquals(1, sp.getForwardingTargetFields(0, 0).size()); + assertEquals(1, sp.getForwardingTargetFields(0, 1).size()); + assertEquals(1, sp.getForwardingTargetFields(0, 2).size()); + assertEquals(1, sp.getForwardingTargetFields(0, 3).size()); + assertTrue(sp.getForwardingTargetFields(0, 0).contains(1)); + assertTrue(sp.getForwardingTargetFields(0, 1).contains(4)); + assertTrue(sp.getForwardingTargetFields(0, 2).contains(3)); + assertTrue(sp.getForwardingTargetFields(0, 3).contains(2)); + assertNotNull(sp.getForwardingTargetFields(0, 4)); + assertEquals(0, sp.getForwardingTargetFields(0, 4).size()); + + sp = new SingleInputSemanticProperties(); + sp.addForwardedField(0,0); + sp.addForwardedField(0,4); + sp.addForwardedField(1,1); + sp.addForwardedField(1,2); + sp.addForwardedField(1,3); + + assertEquals(2, sp.getForwardingTargetFields(0, 0).size()); + assertEquals(3, sp.getForwardingTargetFields(0, 1).size()); + assertTrue(sp.getForwardingTargetFields(0, 0).contains(0)); + assertTrue(sp.getForwardingTargetFields(0, 0).contains(4)); + assertTrue(sp.getForwardingTargetFields(0, 1).contains(1)); + assertTrue(sp.getForwardingTargetFields(0, 1).contains(2)); + assertTrue(sp.getForwardingTargetFields(0, 1).contains(3)); + assertNotNull(sp.getForwardingTargetFields(0, 2)); + assertEquals(0, sp.getForwardingTargetFields(0, 2).size()); + + } + + @Test + public void testGetSourceField() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + sp.addForwardedField(0,1); + sp.addForwardedField(1,4); + sp.addForwardedField(2,3); + sp.addForwardedField(3,2); + + assertEquals(0, sp.getForwardingSourceField(0, 1)); + assertEquals(1, sp.getForwardingSourceField(0, 4)); + assertEquals(2, sp.getForwardingSourceField(0, 3)); + assertEquals(3, sp.getForwardingSourceField(0, 2)); + assertTrue(sp.getForwardingSourceField(0, 0) < 0); + assertTrue(sp.getForwardingSourceField(0, 5) < 0); + + sp = new SingleInputSemanticProperties(); + sp.addForwardedField(0,0); + sp.addForwardedField(0,4); + sp.addForwardedField(1,1); + sp.addForwardedField(1,2); + sp.addForwardedField(1,3); + + assertEquals(0, sp.getForwardingSourceField(0, 0)); + assertEquals(0, sp.getForwardingSourceField(0, 4)); + assertEquals(1, sp.getForwardingSourceField(0, 1)); + assertEquals(1, sp.getForwardingSourceField(0, 2)); + assertEquals(1, sp.getForwardingSourceField(0, 3)); + assertTrue(sp.getForwardingSourceField(0, 5) < 0); + + } + + @Test + public void testGetReadSet() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + sp.addReadFields(new FieldSet(0, 1)); + + assertEquals(2, sp.getReadFields(0).size()); + assertTrue(sp.getReadFields(0).contains(0)); + assertTrue(sp.getReadFields(0).contains(1)); + + sp.addReadFields(new FieldSet(3)); + + assertEquals(3, sp.getReadFields(0).size()); + assertTrue(sp.getReadFields(0).contains(0)); + assertTrue(sp.getReadFields(0).contains(1)); + assertTrue(sp.getReadFields(0).contains(3)); + } + + @Test(expected = InvalidSemanticAnnotationException.class) + public void testAddForwardedFieldsTargetTwice() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + sp.addForwardedField(0, 2); + sp.addForwardedField(1, 2); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testGetTargetFieldInvalidIndex() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + sp.addForwardedField(0,0); + + sp.getForwardingTargetFields(1, 0); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testGetSourceFieldInvalidIndex() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + sp.addForwardedField(0,0); + + sp.getForwardingSourceField(1, 0); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testGetReadFieldsInvalidIndex() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + sp.addReadFields(new FieldSet(0, 1)); + + sp.getReadFields(1); + } + + @Test + public void testAllForwardedSingleInputSemProps() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties.AllFieldsForwardedProperties(); + + assertEquals(1, sp.getForwardingTargetFields(0, 0).size()); + assertEquals(1, sp.getForwardingTargetFields(0, 1).size()); + assertEquals(1, sp.getForwardingTargetFields(0, 123).size()); + assertTrue(sp.getForwardingTargetFields(0, 0).contains(0)); + assertTrue(sp.getForwardingTargetFields(0, 1).contains(1)); + assertTrue(sp.getForwardingTargetFields(0, 123).contains(123)); + + assertEquals(0, sp.getForwardingSourceField(0, 0)); + assertEquals(2, sp.getForwardingSourceField(0, 2)); + assertEquals(123, sp.getForwardingSourceField(0, 123)); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testAllForwardedSingleInputSemPropsInvalidIndex1() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties.AllFieldsForwardedProperties(); + sp.getForwardingSourceField(1, 0); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testAllForwardedSingleInputSemPropsInvalidIndex2() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties.AllFieldsForwardedProperties(); + sp.getForwardingTargetFields(1, 0); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java index 73733fd..7383457 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -24,6 +24,7 @@ import java.util.Collection; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; @@ -198,6 +199,7 @@ public class KMeans { // ************************************************************************* /** Converts a Tuple2<Double,Double> into a Point. */ + @ForwardedFields("0->x; 1->y") public static final class TuplePointConverter implements MapFunction<Tuple2<Double, Double>, Point> { @Override @@ -207,6 +209,7 @@ public class KMeans { } /** Converts a Tuple3<Integer, Double,Double> into a Centroid. */ + @ForwardedFields("0->id; 1->x; 2->y") public static final class TupleCentroidConverter implements MapFunction<Tuple3<Integer, Double, Double>, Centroid> { @Override @@ -216,6 +219,7 @@ public class KMeans { } /** Determines the closest cluster center for a data point. */ + @ForwardedFields("*->1") public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer, Point>> { private Collection<Centroid> centroids; @@ -248,7 +252,8 @@ public class KMeans { } } - /** Appends a count variable to the tuple. */ + /** Appends a count variable to the tuple. */ + @ForwardedFields("f0;f1") public static final class CountAppender implements MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> { @Override @@ -258,6 +263,7 @@ public class KMeans { } /** Sums and counts point coordinates. */ + @ForwardedFields("0") public static final class CentroidAccumulator implements ReduceFunction<Tuple3<Integer, Point, Long>> { @Override @@ -267,6 +273,7 @@ public class KMeans { } /** Computes new centroid from coordinate sum and count of points. */ + @ForwardedFields("0->id") public static final class CentroidAverager implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java index cb3b8d5..bd74b20 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java @@ -25,9 +25,9 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; @@ -126,7 +126,7 @@ public class ConnectedComponents implements ProgramDescription { /** * Function that turns a value into a 2-tuple where both fields are that value. */ - @ConstantFields("0 -> 0,1") + @ForwardedFields("*->f0") public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> { @Override @@ -155,8 +155,8 @@ public class ConnectedComponents implements ProgramDescription { * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function * produces a (Target-vertex-ID, Component-ID) pair. */ - @ConstantFieldsFirst("1 -> 1") - @ConstantFieldsSecond("1 -> 0") + @ForwardedFieldsFirst("f1->f1") + @ForwardedFieldsSecond("f1->f0") public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override @@ -167,7 +167,7 @@ public class ConnectedComponents implements ProgramDescription { - @ConstantFieldsFirst("0") + @ForwardedFieldsFirst("*") public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override @@ -178,6 +178,8 @@ public class ConnectedComponents implements ProgramDescription { } } + + @Override public String getDescription() { return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>"; http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java index 2b65ad6..5af60be 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import org.apache.flink.api.java.DataSet; @@ -119,6 +120,7 @@ public class EnumTrianglesBasic { // ************************************************************************* /** Converts a Tuple2 into an Edge */ + @ForwardedFields("0;1") public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> { private final Edge outEdge = new Edge(); @@ -149,6 +151,7 @@ public class EnumTrianglesBasic { * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. * Assumes that input edges share the first vertex and are in ascending order of the second vertex. */ + @ForwardedFields("0") private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> { private final List<Integer> vertices = new ArrayList<Integer>(); private final Triad outTriad = new Triad(); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java index 3f05a2b..fb1e6f5 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import org.apache.flink.api.java.DataSet; @@ -134,6 +135,7 @@ public class EnumTrianglesOpt { // ************************************************************************* /** Converts a Tuple2 into an Edge */ + @ForwardedFields("0;1") public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> { private final Edge outEdge = new Edge(); @@ -209,6 +211,7 @@ public class EnumTrianglesOpt { * Builds an edge with degree annotation from two edges that have the same vertices and only one * degree annotation. */ + @ForwardedFields("0;1") private static class DegreeJoiner implements ReduceFunction<EdgeWithDegrees> { private final EdgeWithDegrees outEdge = new EdgeWithDegrees(); @@ -269,6 +272,7 @@ public class EnumTrianglesOpt { * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. * Assumes that input edges share the first vertex and are in ascending order of the second vertex. */ + @ForwardedFields("0") private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> { private final List<Integer> vertices = new ArrayList<Integer>(); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java index 4e4d9ad..d622799 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java @@ -26,7 +26,7 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; @@ -156,7 +156,7 @@ public class PageRankBasic { * A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges * originate. Run as a pre-processing step. */ - @ConstantFields("0") + @ForwardedFields("0") public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> { private final ArrayList<Long> neighbors = new ArrayList<Long>(); @@ -194,7 +194,7 @@ public class PageRankBasic { /** * The function that applies the page rank dampening formula */ - @ConstantFields("0") + @ForwardedFields("0") public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> { private final double dampening; http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java index abe96c8..d0fefeb 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java @@ -64,7 +64,7 @@ public class TransitiveClosureNaive implements ProgramDescription { public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception { return new Tuple2<Long, Long>(left.f0, right.f1); } - }) + }).withForwardedFieldsFirst("0").withForwardedFieldsSecond("1") .union(paths) .groupBy(0, 1) .reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { @@ -72,7 +72,7 @@ public class TransitiveClosureNaive implements ProgramDescription { public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception { out.collect(values.iterator().next()); } - }); + }).withForwardedFields("0;1"); DataSet<Tuple2<Long,Long>> newPaths = paths .coGroup(nextPaths) @@ -90,7 +90,7 @@ public class TransitiveClosureNaive implements ProgramDescription { } } } - }); + }).withForwardedFieldsFirst("0").withForwardedFieldsSecond("0"); DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths, newPaths); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java index 9694647..86b7de2 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java @@ -25,6 +25,7 @@ import java.util.Collection; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.examples.java.ml.util.LinearRegressionData; @@ -184,6 +185,7 @@ public class LinearRegression { // ************************************************************************* /** Converts a Tuple2<Double,Double> into a Data. */ + @ForwardedFields("0->x; 1->y") public static final class TupleDataConverter implements MapFunction<Tuple2<Double, Double>, Data> { @Override @@ -193,6 +195,7 @@ public class LinearRegression { } /** Converts a Tuple2<Double,Double> into a Params. */ + @ForwardedFields("0->theta0; 1->theta1") public static final class TupleParamsConverter implements MapFunction<Tuple2<Double, Double>,Params> { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java index 44f134a..890af65 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java @@ -21,6 +21,7 @@ package org.apache.flink.examples.java.relational; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -234,6 +235,7 @@ public class WebLogAnalysis { * If the first input does not provide any pairs, all pairs of the second input are emitted. * Otherwise, no pair is emitted. */ + @ForwardedFieldsFirst("*") public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> { /** http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index 269ba7f..11430e9 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -18,6 +18,7 @@ package org.apache.flink.examples.scala.clustering import org.apache.flink.api.common.functions._ +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.examples.java.clustering.util.KMeansData @@ -83,10 +84,10 @@ object KMeans { val finalCentroids = centroids.iterate(numIterations) { currentCentroids => val newCentroids = points .map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids") - .map { x => (x._1, x._2, 1L) } + .map { x => (x._1, x._2, 1L) }.withForwardedFields("_1; _2") .groupBy(0) - .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) } - .map { x => new Centroid(x._1, x._2.div(x._3)) } + .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }.withForwardedFields("_1") + .map { x => new Centroid(x._1, x._2.div(x._3)) }.withForwardedFields("_1->id") newCentroids } @@ -222,6 +223,7 @@ object KMeans { } /** Determines the closest cluster center for a data point. */ + @ForwardedFields(Array("*->_2")) final class SelectNearestCenter extends RichMapFunction[Point, (Int, Point)] { private var centroids: Traversable[Centroid] = null http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala index dfebfe9..2bb6916 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala @@ -69,7 +69,7 @@ object ConnectedComponents { // read vertex and edge data // assign the initial components (equal to the vertex id) - val vertices = getVerticesDataSet(env).map { id => (id, id) } + val vertices = getVerticesDataSet(env).map { id => (id, id) }.withForwardedFields("*->_1;*->_2") // undirected edges by emitting for each input edge the input edges itself and an inverted // version @@ -82,7 +82,7 @@ object ConnectedComponents { // apply the step logic: join with the edges val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) => (edge._2, vertex._2) - } + }.withForwardedFieldsFirst("_2->_2").withForwardedFieldsSecond("_2->_1") // select the minimum neighbor val minNeighbors = allNeighbors.groupBy(0).min(1) @@ -91,7 +91,7 @@ object ConnectedComponents { val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) { (newVertex, oldVertex, out: Collector[(Long, Long)]) => if (newVertex._2 < oldVertex._2) out.collect(newVertex) - } + }.withForwardedFieldsFirst("*") // delta and new workset are identical (updatedComponents, updatedComponents) http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala index d111890..b4955ed 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala @@ -70,6 +70,7 @@ object DeltaPageRank { .groupBy(0).sum(1) val initialDeltas = initialRanks.map { (page) => (page._1, page._2 - INITIAL_RANK) } + .withForwardedFields("_1") val iteration = initialRanks.iterateDelta(initialDeltas, maxIterations, Array(0)) { @@ -91,7 +92,7 @@ object DeltaPageRank { val rankUpdates = solutionSet.join(deltas).where(0).equalTo(0) { (current, delta) => (current._1, current._2 + delta._2) - } + }.withForwardedFieldsFirst("_1") (rankUpdates, deltas) } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala index 6ebacee..2623c0c 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.examples.scala.graph +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields import org.apache.flink.api.scala._ import scala.collection.JavaConverters._ import org.apache.flink.api.scala.ExecutionEnvironment @@ -85,6 +86,7 @@ object EnumTrianglesBasic { .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder()) // filter triads .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t } + .withForwardedFieldsFirst("*") // emit result if (fileOutput) { @@ -114,6 +116,7 @@ object EnumTrianglesBasic { * of a triad is the shared vertex, the second and third vertex are ordered by vertexId. Assumes * that input edges share the first vertex and are in ascending order of the second vertex. */ + @ForwardedFields(Array("v1->v1")) class TriadBuilder extends GroupReduceFunction[Edge, Triad] { val vertices = mutable.MutableList[Integer]() http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala index bde1bf3..64aeb77 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.examples.scala.graph +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields import org.apache.flink.api.scala._ import scala.collection.JavaConverters._ import org.apache.flink.api.scala.ExecutionEnvironment @@ -97,7 +98,7 @@ object EnumTrianglesOpt { } else { new EdgeWithDegrees(e1.v1, e2.d1, e1.v2, e1.d2) } - } + }.withForwardedFields("v1;v2") // project edges by degrees, vertex with smaller degree comes first val edgesByDegree = edgesWithDegrees @@ -111,6 +112,7 @@ object EnumTrianglesOpt { .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder()) // filter triads .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t} + .withForwardedFieldsFirst("*") // emit result if (fileOutput) { @@ -182,6 +184,7 @@ object EnumTrianglesOpt { * Assumes that input edges share the first vertex and are in ascending order of the second * vertex. */ + @ForwardedFields(Array("v1")) class TriadBuilder extends GroupReduceFunction[Edge, Triad] { val vertices = mutable.MutableList[Integer]() http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala index 1d70ae9..dc42ed1 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala @@ -84,7 +84,7 @@ object PageRankBasic { val links = getLinksDataSet(env) // assign initial ranks to pages - val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)) + val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)).withForwardedFields("*->pageId") // build adjacency list from link input val adjacencyLists = links @@ -112,7 +112,7 @@ object PageRankBasic { // apply dampening factor .map { p => Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages)) - } + }.withForwardedFields("pageId") // terminate if no rank update was significant val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") { http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala index b281b98..d171611 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala @@ -38,10 +38,10 @@ object TransitiveClosureNaive { .join(edges) .where(1).equalTo(0) { (left, right) => (left._1,right._2) - } + }.withForwardedFieldsFirst("_1").withForwardedFieldsSecond("_2") .union(prevPaths) .groupBy(0, 1) - .reduce((l, r) => l) + .reduce((l, r) => l).withForwardedFields("_1; _2") val terminate = prevPaths .coGroup(nextPaths) @@ -51,7 +51,7 @@ object TransitiveClosureNaive { for (n <- next) if (!prevPaths.contains(n)) out.collect(n) } - } + }.withForwardedFieldsSecond("*") (nextPaths, terminate) } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala index 38b6415..950e3c8 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala @@ -110,12 +110,12 @@ object WebLogAnalysis { val joinDocsRanks = filteredDocs.join(filteredRanks).where(0).equalTo(1) { (doc, rank) => rank - } + }.withForwardedFieldsSecond("*") val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) { (ranks, visits, out: Collector[(Int, String, Int)]) => if (visits.isEmpty) for (rank <- ranks) out.collect(rank) - } + }.withForwardedFieldsFirst("*") http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java index 25875a0..09678fd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java @@ -29,251 +29,280 @@ import java.util.Set; import org.apache.flink.api.common.InvalidProgramException; /** - * This class defines the semantic assertions that can be added to functions. - * The assertions are realized as java annotations, to be added to the class declaration of - * the class that implements the functions. For example, to declare the <i>ConstantFields</i> - * annotation for a map-type function that simply copies some fields, use it the following way: + * This class defines Java annotations for semantic assertions that can be added to Flink functions. + * Semantic annotations can help the Flink optimizer to generate more efficient execution plans for Flink programs. + * For example, a <i>ForwardedFields</i> assertion for a map-type function can be declared as: * * <pre><blockquote> - * \@ConstantFields({"0->0,1", "1->2"}) - * public class MyMapper extends FlatMapFunction<Tuple3<String, Integer, Integer>, Tuple3<String, String, Integer>> + * \@ForwardedFields({"f0; f1->f2"}) + * public class MyMapper extends MapFunction<Tuple3<String, String, Integer>, Tuple3<String, Integer, Integer>> * { - * public void flatMap(Tuple3<String, Integer, Integer> value, Collector<Tuple3<String, String, Integer>> out) { - * value.f2 = value.f1 - * value.f1 = value.f0; - * out.collect(value); + * public Tuple3<String, Integer, Integer> map(Tuple3<String, String, Integer> val) { + * + * return new Tuple3<String, Integer, Integer>(val.f0, val.f2, 1); * } * } * </blockquote></pre> + * * <p> - * All annotations takes String arrays. The Strings represent the source and destination fields. - * The transition is represented by the arrow "->". - * Fields are described by their tuple position (and later also the names of the fields in the objects). - * The left hand side of the arrow always describes the fields in the input value(s), i.e. the value that - * is passed as a parameter to the function call, or the values obtained from the input iterator. The right - * hand side of the arrow describes the field in the value returned from the function. If the right hand side - * is omitted, the a field is assumed to stay exactly the same, i.e. the field itself is unmodified, rather - * than that the value is placed into another field. + * All annotations take Strings with expressions that refer to (nested) value fields of the input and output types of a function. + * Field expressions for of composite data types (tuples, POJOs, Scala case classes) can be expressed in + * different ways, depending on the data type they refer to. + *</p> + * + * <ul> + * <li>Java tuple data types (such as {@link org.apache.flink.api.java.tuple.Tuple3}): A tuple field can be addressed using + * its 0-offset index or name, e.g., the second field of a Java tuple is addressed by <code>"1"</code> or <code>"f1"</code>.</li> + * <li>Java POJO data types: A POJO field is addressed using its names, e.g., <code>"xValue"</code> for the member field + * <code>xValue</code> of a POJO type that describes a 2d-coordinate.</li> + * <li>Scala tuple data types (such as {@link scala.Tuple3}): A tuple field can be addressed using its 1-offset name + * (following Scala conventions) or 0-offset index, e.g., the second field of a Scala tuple is addressed by + * <code>"_2"</code> or <code>1</code></li> + * <li>Scala case classes: A case class field is addressed using its names, e.g., <code>"xValue"</code> for the field <code>xValue</code> + * of a case class that describes a 2d-coordinate.</li> + * </ul> + * * <p> - * <b> - * It is very important to follow a conservative strategy when specifying constant fields. - * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be - * declared as such! Otherwise, the correct execution of a program can not be guaranteed. So if in doubt, - * do not add a field to this set. + * Nested fields are addressed by navigation, e.g., <code>"f1.xValue"</code> addresses the field <code>xValue</code> of a POJO type, + * that is stored at the second field of a Java tuple. In order to refer to all fields of a composite type (or the composite type itself) + * such as a tuple, POJO, or case class type, a <code>"*"</code> wildcard can be used, e.g., <code>f2.*</code> or <code>f2</code> reference all fields + * of a composite type at the third position of a Java tuple. + * </p> + * + * <b>NOTE: The use of semantic annotation is optional! + * If used correctly, semantic annotations can help the Flink optimizer to generate more efficient execution plans. + * However, incorrect semantic annotations can cause the optimizer to generate incorrect execution plans which compute wrong results! + * So be careful when adding semantic annotations. * </b> - * <p> - * Be aware that some annotations should only be used for functions with as single input - * ({@link org.apache.flink.api.common.functions.RichMapFunction}, {@link org.apache.flink.api.common.functions.RichReduceFunction}) and some only for stubs with two inputs - * ({@link org.apache.flink.api.common.functions.RichCrossFunction}, {@link org.apache.flink.api.common.functions.RichFlatJoinFunction}, {@link org.apache.flink.api.common.functions.RichCoGroupFunction}). + * */ public class FunctionAnnotation { /** - * This annotation declares that a function leaves certain fields of its input values unmodified and - * only "forwards" or "copies" them to the return value. The annotation is applicable to unary - * functions, like for example {@link org.apache.flink.api.common.functions.RichMapFunction}, {@link org.apache.flink.api.common.functions.RichReduceFunction}, or {@link org.apache.flink.api.common.functions.RichFlatMapFunction}. - * <p> - * The following example illustrates a function that keeps the tuple's field zero constant: - * <pre><blockquote> - * \@ConstantFields("0") - * public class MyMapper extends MapFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Double>> - * { - * public Tuple2<String, Double> map(Tuple3<String, Integer, Integer> value) { - * return new Tuple2<String, Double>(value.f0, value.f1 * 0.5); - * } - * } - * </blockquote></pre> - * <p> - * (Note that you could equivalently write {@code @ConstantFields("0 -> 0")}. - * <p> - * This annotation is mutually exclusive with the {@link ConstantFieldsExcept} annotation. - * <p> - * If neither this annotation, nor the {@link ConstantFieldsExcept} annotation are set, it is - * assumed that <i>no</i> field in the input is forwarded/copied unmodified. - * - * @see ConstantFieldsExcept + * The ForwardedFields annotation declares fields which are never modified by the annotated function and + * which are forwarded at the same position to the output or unchanged copied to another position in the output. + * + * Fields that are forwarded at the same position can be specified by their position. + * The specified position must be valid for the input and output data type and have the same type. + * For example <code>\@ForwardedFields({"f2"})</code> declares that the third field of a Java input tuple is + * copied to the third field of an output tuple. + * + * Fields which are unchanged copied to another position in the output are declared by specifying the + * source field expression in the input and the target field expression in the output. + * <code>\@ForwardedFields({"f0->f2"})</code> denotes that the first field of the Java input tuple is + * unchanged copied to the third field of the Java output tuple. When using the wildcard ("*") ensure that + * the number of declared fields and their types in input and output type match. + * + * Multiple forwarded fields can be annotated in one (<code>\@ForwardedFields({"f2; f3->f0; f4"})</code>) + * or separate Strings (<code>\@ForwardedFields({"f2", "f3->f0", "f4"})</code>). + * + * <b>NOTE: The use of the ForwardedFields annotation is optional. + * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. + * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! + * It is NOT required that all forwarded fields are declared, but all declarations must be correct. + * </b> + * + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field expressions such as nested fields and wildcard. + * */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFields { + public @interface ForwardedFields { String[] value(); } /** - * This annotation declares that a function leaves certain fields of its first input values unmodified and - * only "forwards" or "copies" them to the return value. The annotation is applicable to binary - * functions, like for example {@link org.apache.flink.api.common.functions.RichFlatJoinFunction}, {@link org.apache.flink.api.common.functions.RichCoGroupFunction}, or {@link org.apache.flink.api.common.functions.RichCrossFunction}. - * <p> - * The following example illustrates a join function that copies fields from the first and second input to the - * return value: - * <pre><blockquote> - * \@ConstantFieldsFirst("1 -> 0") - * \@ConstantFieldsFirst("1 -> 1") - * public class MyJoin extends JoinFunction<Tuple2<String, Integer>, Tuple2<String, String>, Tuple2<Integer, String>> - * { - * public Tuple2<Integer, String> map(Tuple2<String, Integer> first, Tuple2<String, String> second) { - * return new Tuple2<String, Double>(first.f1, second.f1); - * } - * } - * </blockquote></pre> - * <p> - * This annotation is mutually exclusive with the {@link ConstantFieldsFirstExcept} annotation. - * <p> - * If neither this annotation, nor the {@link ConstantFieldsFirstExcept} annotation are set, it is - * assumed that <i>no</i> field in the first input is forwarded/copied unmodified. - * - * @see ConstantFieldsSecond - * @see ConstantFields + * The ForwardedFieldsFirst annotation declares fields of the first input of a function which are + * never modified by the annotated function and which are forwarded at the same position to the + * output or unchanged copied to another position in the output. + * + * Fields that are forwarded from the first input at the same position in the output can be + * specified by their position. The specified position must be valid for the input and output data type and have the same type. + * For example <code>\@ForwardedFieldsFirst({"f2"})</code> declares that the third field of a Java input tuple at the first input is + * copied to the third field of an output tuple. + * + * Fields which are unchanged copied to another position in the output are declared by specifying the + * source field expression in the input and the target field expression in the output. + * <code>\@ForwardedFieldsFirst({"f0->f2"})</code> denotes that the first field of the Java input tuple at the first input is + * unchanged copied to the third field of the Java output tuple. When using the wildcard ("*") ensure that + * the number of declared fields and their types in input and output type match. + * + * Multiple forwarded fields can be annotated in one (<code>\@ForwardedFieldsFirst({"f2; f3->f0; f4"})</code>) + * or separate Strings (<code>\@ForwardedFieldsFirst({"f2", "f3->f0", "f4"})</code>). + * + * <b>NOTE: The use of the ForwardedFieldsFirst annotation is optional. + * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. + * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! + * It is NOT required that all forwarded fields are declared, but all declarations must be correct. + * </b> + * + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field expressions such as nested fields and wildcard. + * + * Forwarded fields from the second input can be specified using the + * {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond} annotation. + * */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFieldsFirst { + public @interface ForwardedFieldsFirst { String[] value(); } /** - * This annotation declares that a function leaves certain fields of its second input values unmodified and - * only "forwards" or "copies" them to the return value. The annotation is applicable to binary - * functions, like for example {@link org.apache.flink.api.common.functions.RichFlatJoinFunction}, {@link org.apache.flink.api.common.functions.RichCoGroupFunction}, or {@link org.apache.flink.api.common.functions.RichCrossFunction}. - * <p> - * The following example illustrates a join function that copies fields from the first and second input to the - * return value: - * <pre><blockquote> - * \@ConstantFieldsFirst("1 -> 0") - * \@ConstantFieldsFirst("1 -> 1") - * public class MyJoin extends JoinFunction<Tuple2<String, Integer>, Tuple2<String, String>, Tuple2<Integer, String>> - * { - * public Tuple2<Integer, String> map(Tuple2<String, Integer> first, Tuple2<String, String> second) { - * return new Tuple2<String, Double>(first.f1, second.f1); - * } - * } - * </blockquote></pre> - * <p> - * This annotation is mutually exclusive with the {@link ConstantFieldsSecond} annotation. - * <p> - * If neither this annotation, nor the {@link ConstantFieldsSecondExcept} annotation are set, it is - * assumed that <i>no</i> field in the second input is forwarded/copied unmodified. - * - * @see ConstantFieldsFirst - * @see ConstantFields + * The ForwardedFieldsSecond annotation declares fields of the second input of a function which are + * never modified by the annotated function and which are forwarded at the same position to the + * output or unchanged copied to another position in the output. + * + * Fields that are forwarded from the second input at the same position in the output can be + * specified by their position. The specified position must be valid for the input and output data type and have the same type. + * For example <code>\@ForwardedFieldsSecond({"f2"})</code> declares that the third field of a Java input tuple at the second input is + * copied to the third field of an output tuple. + * + * Fields which are unchanged copied to another position in the output are declared by specifying the + * source field expression in the input and the target field expression in the output. + * <code>\@ForwardedFieldsSecond({"f0->f2"})</code> denotes that the first field of the Java input tuple at the second input is + * unchanged copied to the third field of the Java output tuple. When using the wildcard ("*") ensure that + * the number of declared fields and their types in input and output type match. + * + * Multiple forwarded fields can be annotated in one (<code>\@ForwardedFieldsSecond({"f2; f3->f0; f4"})</code>) + * or separate Strings (<code>\@ForwardedFieldsSecond({"f2", "f3->f0", "f4"})</code>). + * + * <b>NOTE: The use of the ForwardedFieldsSecond annotation is optional. + * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. + * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! + * It is NOT required that all forwarded fields are declared, but all declarations must be correct. + * </b> + * + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field expressions such as nested fields and wildcard. + * + * Forwarded fields from the first input can be specified using the + * {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst} annotation. + * */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFieldsSecond { + public @interface ForwardedFieldsSecond { String[] value(); } /** - * This annotation declares that a function changes certain fields of its input values, while leaving all - * others unmodified and in place in the return value. The annotation is applicable to unary - * functions, like for example {@link org.apache.flink.api.common.functions.RichMapFunction}, {@link org.apache.flink.api.common.functions.RichReduceFunction}, or {@link org.apache.flink.api.common.functions.RichFlatMapFunction}. - * <p> - * The following example illustrates that at the example of a Map function: - * - * <pre><blockquote> - * \@ConstantFieldsExcept("1") - * public class MyMapper extends MapFunction<Tuple3<String, Integer, Double>, Tuple3<String, Double, Double>> - * { - * public Tuple3<String, String, Double> map(Tuple3<String, Integer, Double> value) { - * return new Tuple3<String, String, Double>(value.f0, value.f2 / 2, value.f2); - * } - * } - * </blockquote></pre> - * <p> - * The annotation takes one String array specifying the positions of the input types that do not remain constant. - * When this annotation is used, it is assumed that all other values remain at the same position in input and output. - * To model more complex situations use the {@link ConstantFields}s annotation. - * <p> - * This annotation is mutually exclusive with the {@link ConstantFields} annotation. - * <p> - * If neither this annotation, nor the {@link ConstantFields} annotation are set, it is - * assumed that <i>no</i> field in the input is forwarded/copied unmodified. - * - * @see ConstantFieldsExcept + * The NonForwardedFields annotation declares ALL fields which not preserved on the same position in a functions output. + * ALL other fields are considered to be unmodified at the same position. + * Hence, the NonForwardedFields annotation is inverse to the {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields} annotation. + * + * <b>NOTE: The use of the NonForwardedFields annotation is optional. + * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. + * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! + * Since all not declared fields are considered to be forwarded, it is required that ALL non-forwarded fields are declared. + * </b> + * + * Non-forwarded fields are declared as a list of field expressions, e.g., <code>\@NonForwardedFields({"f1; f3"})</code> + * declares that the second and fourth field of a Java tuple are modified and all other fields are are not changed and remain + * on their position. A NonForwardedFields annotation can only be used on functions where the type of the input and output are identical. + * + * Multiple non-forwarded fields can be annotated in one (<code>\@NonForwardedFields({"f1; f3"})</code>) + * or separate Strings (<code>\@NonForwardedFields({"f1", "f3"})</code>). + * + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field expressions such as nested fields and wildcard. + * + * @see org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFieldsExcept { + public @interface NonForwardedFields { String[] value(); } /** - * This annotation declares that a function changes certain fields of its first input value, while leaving all - * others unmodified and in place in the return value. The annotation is applicable to binary - * functions, like for example {@link org.apache.flink.api.common.functions.RichFlatJoinFunction}, {@link org.apache.flink.api.common.functions.RichCoGroupFunction}, or {@link org.apache.flink.api.common.functions.RichCrossFunction}. - * <p> - * The following example illustrates a join function that copies fields from the first and second input to the - * return value: - * - * <pre><blockquote> - * \@ConstantFieldsFirstExcept("1") - * public class MyJoin extends JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Double, Double>> - * { - * public Tuple3<String, Double, Double> map(Tuple3<String, Integer, Double> first, Tuple2<String, Double> second) { - * return Tuple3<String, Double, Double>(first.f0, second.f1, first.f2); - * } - * } - * </blockquote></pre> - * <p> - * The annotation takes one String array specifying the positions of the input types that do not remain constant. - * When this annotation is used, it is assumed that all other values remain at the same position in input and output. - * To model more complex situations use the {@link ConstantFields}s annotation. - * <p> - * This annotation is mutually exclusive with the {@link ConstantFieldsFirst} - * <p> - * If neither this annotation, nor the {@link ConstantFieldsFirst} annotation are set, it is - * assumed that <i>no</i> field in the first input is forwarded/copied unmodified. - * - * @see ConstantFieldsFirst - * @see ConstantFieldsSecond - * @see ConstantFieldsSecondExcept + * The NonForwardedFieldsFirst annotation declares for a function ALL fields of its first input + * which are not preserved on the same position in its output. + * ALL other fields are considered to be unmodified at the same position. + * Hence, the NonForwardedFieldsFirst annotation is inverse to the {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst} annotation. + * + * <b>NOTE: The use of the NonForwardedFieldsFirst annotation is optional. + * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. + * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! + * Since all not declared fields are considered to be forwarded, it is required that ALL non-forwarded fields of the first input are declared. + * </b> + * + * Non-forwarded fields are declared as a list of field expressions, e.g., <code>\@NonForwardedFieldsFirst({"f1; f3"})</code> + * declares that the second and fourth field of a Java tuple from the first input are modified and + * all other fields of the first input are are not changed and remain on their position. + * A NonForwardedFieldsFirst annotation can only be used on functions where the type of the first input and the output are identical. + * + * Multiple non-forwarded fields can be annotated in one (<code>\@NonForwardedFieldsFirst({"f1; f3"})</code>) + * or separate Strings (<code>\@NonForwardedFieldsFirst({"f1", "f3"})</code>). + * + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field expressions such as nested fields and wildcard. + * + * @see org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields + * @see org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFieldsFirstExcept { + public @interface NonForwardedFieldsFirst { String[] value(); } /** - * This annotation declares that a function changes certain fields of its second input value, while leaving all - * others unmodified and in place in the return value. The annotation is applicable to binary - * functions, like for example {@link org.apache.flink.api.common.functions.RichFlatJoinFunction}, {@link org.apache.flink.api.common.functions.RichCoGroupFunction}, or {@link org.apache.flink.api.common.functions.RichCrossFunction}. - * <p> - * The following example illustrates a join function that copies fields from the first and second input to the - * return value: - * - * <pre><blockquote> - * \@ConstantFieldsSecondExcept("1") - * public class MyJoin extends JoinFunction<Tuple2<String, Double>, Tuple3<String, Integer, Double>, Tuple3<String, Double, Double>> - * { - * public Tuple3<String, Double, Double> map(Tuple2<String, Double> first, Tuple3<String, Integer, Double> second) { - * return Tuple3<String, Double, Double>(second.f0, first.f1, second.f2); - * } - * } - * </blockquote></pre> - * <p> - * The annotation takes one String array specifying the positions of the input types that do not remain constant. - * When this annotation is used, it is assumed that all other values remain at the same position in input and output. - * To model more complex situations use the {@link ConstantFields}s annotation. - * <p> - * This annotation is mutually exclusive with the {@link ConstantFieldsSecond} - * <p> - * If neither this annotation, nor the {@link ConstantFieldsSecond} annotation are set, it is - * assumed that <i>no</i> field in the second input is forwarded/copied unmodified. - * - * @see ConstantFieldsFirst - * @see ConstantFieldsFirstExcept - * @see ConstantFieldsSecond + * The NonForwardedFieldsSecond annotation declares for a function ALL fields of its second input + * which are not preserved on the same position in its output. + * ALL other fields are considered to be unmodified at the same position. + * Hence, the NonForwardedFieldsSecond annotation is inverse to the {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond} annotation. + * + * <b>NOTE: The use of the NonForwardedFieldsSecond annotation is optional. + * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. + * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! + * Since all not declared fields are considered to be forwarded, it is required that ALL non-forwarded fields of the second input are declared. + * </b> + * + * Non-forwarded fields are declared as a list of field expressions, e.g., <code>\@NonForwardedFieldsSecond({"f1; f3"})</code> + * declares that the second and fourth field of a Java tuple from the second input are modified and + * all other fields of the second input are are not changed and remain on their position. + * A NonForwardedFieldsSecond annotation can only be used on functions where the type of the second input and the output are identical. + * + * Multiple non-forwarded fields can be annotated in one (<code>\@NonForwardedFieldsSecond({"f1; f3"})</code>) + * or separate Strings (<code>\@NonForwardedFieldsSecond({"f1", "f3"})</code>). + * + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field expressions such as nested fields and wildcard. + * + * @see org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields + * @see org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFieldsSecondExcept { + public @interface NonForwardedFieldsSecond { String[] value(); } /** - * Specifies the fields of the input value of a user-defined that are accessed in the code. - * This annotation can only be used with user-defined functions with one input (Map, Reduce, ...). + * The ReadFields annotation declares for a function all fields which it accesses and evaluates, i.e., + * all fields that are used by the function to compute its result. + * For example, fields which are evaluated in conditional statements or used for computations are considered to be read. + * Fields which are only unmodified copied to the output without evaluating their values are NOT considered to be read. + * + * <b>NOTE: The use of the ReadFields annotation is optional. + * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. + * The ReadFields annotation requires that ALL read fields are declared. + * Otherwise, it can cause invalid plan choices and the computation of wrong results! + * Declaring a non-read field as read is not harmful but might reduce optimization potential. + * </b> + * + * Read fields are declared as a list of field expressions, e.g., <code>\@ReadFields({"f0; f2"})</code> declares the first and third + * field of a Java input tuple to be read. All other fields are considered to not influence the behavior of the function. + * + * Multiple read fields can be declared in one <code>\@ReadFields({"f0; f2"})</code> or + * multiple separate Strings <code>\@ReadFields({"f0", "f2"})</code>. + * + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field expressions such as nested fields and wildcard. + * */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @@ -282,22 +311,62 @@ public class FunctionAnnotation { } /** - * Specifies the fields of the first input value of a user-defined that are accessed in the code. - * This annotation can only be used with user-defined functions with two inputs (Join, Cross, ...). + * The ReadFieldsFirst annotation declares for a function all fields of the first input which it accesses and evaluates, i.e., + * all fields of the first input that are used by the function to compute its result. + * For example, fields which are evaluated in conditional statements or used for computations are considered to be read. + * Fields which are only unmodified copied to the output without evaluating their values are NOT considered to be read. + * + * <b>NOTE: The use of the ReadFieldsFirst annotation is optional. + * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. + * The ReadFieldsFirst annotation requires that ALL read fields of the first input are declared. + * Otherwise, it can cause invalid plan choices and the computation of wrong results! + * Declaring a non-read field as read is not harmful but might reduce optimization potential. + * </b> + * + * Read fields are declared as a list of field expressions, e.g., <code>\@ReadFieldsFirst({"f0; f2"})</code> declares the first and third + * field of a Java input tuple of the first input to be read. + * All other fields of the first input are considered to not influence the behavior of the function. + * + * Multiple read fields can be declared in one <code>\@ReadFieldsFirst({"f0; f2"})</code> or + * multiple separate Strings <code>\@ReadFieldsFirst({"f0", "f2"})</code>. + * + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field expressions such as nested fields and wildcard. + * */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - public @interface ReadFieldsSecond { + public @interface ReadFieldsFirst { String[] value(); } /** - * Specifies the fields of the second input value of a user-defined that are accessed in the code. - * This annotation can only be used with user-defined functions with two inputs (Join, Cross, ...). + * The ReadFieldsSecond annotation declares for a function all fields of the second input which it accesses and evaluates, i.e., + * all fields of the second input that are used by the function to compute its result. + * For example, fields which are evaluated in conditional statements or used for computations are considered to be read. + * Fields which are only unmodified copied to the output without evaluating their values are NOT considered to be read. + * + * <b>NOTE: The use of the ReadFieldsSecond annotation is optional. + * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. + * The ReadFieldsSecond annotation requires that ALL read fields of the second input are declared. + * Otherwise, it can cause invalid plan choices and the computation of wrong results! + * Declaring a non-read field as read is not harmful but might reduce optimization potential. + * </b> + * + * Read fields are declared as a list of field expressions, e.g., <code>\@ReadFieldsSecond({"f0; f2"})</code> declares the first and third + * field of a Java input tuple of the second input to be read. + * All other fields of the second input are considered to not influence the behavior of the function. + * + * Multiple read fields can be declared in one <code>\@ReadFieldsSecond({"f0; f2"})</code> or + * multiple separate Strings <code>\@ReadFieldsSecond({"f0", "f2"})</code>. + * + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field expressions such as nested fields and wildcard. + * */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - public @interface ReadFieldsFirst { + public @interface ReadFieldsSecond { String[] value(); } @@ -311,111 +380,82 @@ public class FunctionAnnotation { // -------------------------------------------------------------------------------------------- /** - * Reads the annotations of a user defined function with one input and returns semantic properties according to the constant fields annotated. + * Reads the annotations of a user defined function with one input and returns semantic properties according to the forwarded fields annotated. * * @param udfClass The user defined function, represented by its class. - * @return The DualInputSemanticProperties containing the constant fields. + * @return The DualInputSemanticProperties containing the forwarded fields. */ - public static Set<Annotation> readSingleConstantAnnotations(Class<?> udfClass) { - ConstantFields constantSet = udfClass.getAnnotation(ConstantFields.class); - ConstantFieldsExcept notConstantSet = udfClass.getAnnotation(ConstantFieldsExcept.class); - ReadFields readfieldSet = udfClass.getAnnotation(ReadFields.class); - - Set<Annotation> result = null; - - if (notConstantSet != null && constantSet != null) { - throw new InvalidProgramException("Either " + ConstantFields.class.getSimpleName() + " or " + - ConstantFieldsExcept.class.getSimpleName() + " can be annotated to a function, not both."); - } - - if (notConstantSet != null) { - result = new HashSet<Annotation>(); - result.add(notConstantSet); + public static Set<Annotation> readSingleForwardAnnotations(Class<?> udfClass) { + ForwardedFields forwardedFields = udfClass.getAnnotation(ForwardedFields.class); + NonForwardedFields nonForwardedFields = udfClass.getAnnotation(NonForwardedFields.class); + ReadFields readSet = udfClass.getAnnotation(ReadFields.class); + + Set<Annotation> annotations = new HashSet<Annotation>(); + if(forwardedFields != null) { + annotations.add(forwardedFields); } - if (constantSet != null) { - result = new HashSet<Annotation>(); - result.add(constantSet); - } - - if (readfieldSet != null) { - if (result == null) { - result = new HashSet<Annotation>(); + if(nonForwardedFields != null) { + if(!annotations.isEmpty()) { + throw new InvalidProgramException("Either " + ForwardedFields.class.getSimpleName() + " or " + + NonForwardedFields.class.getSimpleName() + " can be annotated to a function, not both."); } - result.add(readfieldSet); + annotations.add(nonForwardedFields); + } + if(readSet != null) { + annotations.add(readSet); } - return result; + return !annotations.isEmpty() ? annotations : null; } // -------------------------------------------------------------------------------------------- /** - * Reads the annotations of a user defined function with two inputs and returns semantic properties according to the constant fields annotated. + * Reads the annotations of a user defined function with two inputs and returns semantic properties according to the forwarded fields annotated. * @param udfClass The user defined function, represented by its class. - * @return The DualInputSemanticProperties containing the constant fields. + * @return The DualInputSemanticProperties containing the forwarded fields. */ - public static Set<Annotation> readDualConstantAnnotations(Class<?> udfClass) { + public static Set<Annotation> readDualForwardAnnotations(Class<?> udfClass) { // get readSet annotation from stub - ConstantFieldsFirst constantSet1 = udfClass.getAnnotation(ConstantFieldsFirst.class); - ConstantFieldsSecond constantSet2= udfClass.getAnnotation(ConstantFieldsSecond.class); + ForwardedFieldsFirst forwardedFields1 = udfClass.getAnnotation(ForwardedFieldsFirst.class); + ForwardedFieldsSecond forwardedFields2= udfClass.getAnnotation(ForwardedFieldsSecond.class); // get readSet annotation from stub - ConstantFieldsFirstExcept notConstantSet1 = udfClass.getAnnotation(ConstantFieldsFirstExcept.class); - ConstantFieldsSecondExcept notConstantSet2 = udfClass.getAnnotation(ConstantFieldsSecondExcept.class); + NonForwardedFieldsFirst nonForwardedFields1 = udfClass.getAnnotation(NonForwardedFieldsFirst.class); + NonForwardedFieldsSecond nonForwardedFields2 = udfClass.getAnnotation(NonForwardedFieldsSecond.class); - ReadFieldsFirst readfieldSet1 = udfClass.getAnnotation(ReadFieldsFirst.class); - ReadFieldsSecond readfieldSet2 = udfClass.getAnnotation(ReadFieldsSecond.class); + ReadFieldsFirst readSet1 = udfClass.getAnnotation(ReadFieldsFirst.class); + ReadFieldsSecond readSet2 = udfClass.getAnnotation(ReadFieldsSecond.class); - if (notConstantSet1 != null && constantSet1 != null) { - throw new InvalidProgramException("Either " + ConstantFieldsFirst.class.getSimpleName() + " or " + - ConstantFieldsFirstExcept.class.getSimpleName() + " can be annotated to a function, not both."); - } + Set<Annotation> annotations = new HashSet<Annotation>(); - if (constantSet2 != null && notConstantSet2 != null) { - throw new InvalidProgramException("Either " + ConstantFieldsSecond.class.getSimpleName() + " or " + - ConstantFieldsSecondExcept.class.getSimpleName() + " can be annotated to a function, not both."); + if (nonForwardedFields1 != null && forwardedFields1 != null) { + throw new InvalidProgramException("Either " + ForwardedFieldsFirst.class.getSimpleName() + " or " + + NonForwardedFieldsFirst.class.getSimpleName() + " can be annotated to a function, not both."); + } else if (forwardedFields1 != null) { + annotations.add(forwardedFields1); + } else if (nonForwardedFields1 != null) { + annotations.add(nonForwardedFields1); } - Set<Annotation> result = null; - - if (notConstantSet2 != null) { - result = new HashSet<Annotation>(); - result.add(notConstantSet2); - } - if (constantSet2 != null) { - result = new HashSet<Annotation>(); - result.add(constantSet2); + if (forwardedFields2 != null && nonForwardedFields2 != null) { + throw new InvalidProgramException("Either " + ForwardedFieldsSecond.class.getSimpleName() + " or " + + NonForwardedFieldsSecond.class.getSimpleName() + " can be annotated to a function, not both."); + } else if (forwardedFields2 != null) { + annotations.add(forwardedFields2); + } else if (nonForwardedFields2 != null) { + annotations.add(nonForwardedFields2); } - if (readfieldSet2 != null) { - if (result == null) { - result = new HashSet<Annotation>(); - } - result.add(readfieldSet2); - } - - if (notConstantSet1 != null) { - if (result == null) { - result = new HashSet<Annotation>(); - } - result.add(notConstantSet1); + if (readSet1 != null) { + annotations.add(readSet1); } - if (constantSet1 != null) { - if (result == null) { - result = new HashSet<Annotation>(); - } - result.add(constantSet1); - } - - if (readfieldSet1 != null) { - if (result == null) { - result = new HashSet<Annotation>(); - } - result.add(readfieldSet1); + if (readSet2 != null) { + annotations.add(readSet2); } - return result; + return !annotations.isEmpty() ? annotations : null; } }
