Repository: flink
Updated Branches:
  refs/heads/master 82c420022 -> 805ea6943


http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
index e99cac7..7b90c8e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
@@ -144,7 +144,7 @@ public class Ordering {
                }
                
                for (int i = 0; i < this.indexes.size(); i++) {
-                       if (this.indexes.get(i) != 
otherOrdering.indexes.get(i)) {
+                       if (this.indexes.get(i).intValue() != 
otherOrdering.indexes.get(i).intValue()) {
                                return false;
                        }
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
index ba801ec..da99018 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
@@ -27,7 +27,8 @@ import org.apache.flink.api.common.operators.util.FieldSet;
  * Container for the semantic properties associated to an operator.
  */
 public abstract class SemanticProperties implements Serializable {
-       
+       private boolean allFieldsConstant;
+
        private static final long serialVersionUID = 1L;
 
        /** Set of fields that are written in the destination record(s).*/
@@ -47,7 +48,19 @@ public abstract class SemanticProperties implements 
Serializable {
                        this.writtenFields = 
this.writtenFields.addFields(writtenFields);
                }
        }
-       
+
+       public void setAllFieldsConstant(boolean constant) {
+               this.allFieldsConstant = constant;
+       }
+
+       public boolean isAllFieldsConstant() {
+               return this.allFieldsConstant;
+       }
+
+       public abstract FieldSet getForwardFields(int input, int field);
+
+       public abstract FieldSet getSourceField(int input, int field);
+
        /**
         * Sets the field(s) that are written in the destination record(s).
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
index 77ed1bc..abe995b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
@@ -36,7 +36,41 @@ public class SingleInputSemanticProperties extends 
SemanticProperties {
        /** Set of fields that are read in the source record(s).*/
        private FieldSet readFields;
 
-       
+       @Override
+       public FieldSet getForwardFields(int input, int field) {
+               if (input != 0) {
+                       throw new IndexOutOfBoundsException();
+               }
+               return this.getForwardedField(field);
+       }
+
+       @Override
+       public FieldSet getSourceField(int input, int field) {
+               if (input != 0) {
+                       throw new IndexOutOfBoundsException();
+               }
+
+               if (isAllFieldsConstant()) {
+                       return new FieldSet(field);
+               }
+
+               return this.forwardedFrom(field);
+       }
+
+       public FieldSet forwardedFrom(int dest) {
+               FieldSet fs = null;
+               for (Map.Entry<Integer, FieldSet> entry : 
forwardedFields.entrySet()) {
+                       if (entry.getValue().contains(dest)) {
+                               if (fs == null) {
+                                       fs = new FieldSet();
+                               }
+
+                               fs = fs.addField(entry.getKey());
+                       }
+               }
+               return fs;
+       }
+
        public SingleInputSemanticProperties() {
                init();
        }
@@ -95,6 +129,10 @@ public class SingleInputSemanticProperties extends 
SemanticProperties {
         * @return the destination fields, or null if they do not exist
         */
        public FieldSet getForwardedField(int sourceField) {
+               if (isAllFieldsConstant()) {
+                       return new FieldSet(sourceField);
+               }
+
                return this.forwardedFields.get(sourceField);
        }
        
@@ -145,7 +183,12 @@ public class SingleInputSemanticProperties extends 
SemanticProperties {
                                (forwardedFields == null || 
forwardedFields.isEmpty()) &&
                                (readFields == null || readFields.size() == 0);
        }
-       
+
+       @Override
+       public String toString() {
+               return "SISP(" + this.forwardedFields + ")";
+       }
+
        private void init() {
                this.forwardedFields = new HashMap<Integer,FieldSet>();
                this.readFields = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
index cb99e23..cb3b8d5 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
@@ -155,8 +155,8 @@ public class ConnectedComponents implements 
ProgramDescription {
         * a vertex is associated with, with a (Source-Vertex-ID, 
Target-VertexID) edge. The function
         * produces a (Target-vertex-ID, Component-ID) pair.
         */
-       @ConstantFieldsFirst("1 -> 0")
-       @ConstantFieldsSecond("1 -> 1")
+       @ConstantFieldsFirst("1 -> 1")
+       @ConstantFieldsSecond("1 -> 0")
        public static final class NeighborWithComponentIDJoin implements 
JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                @Override
@@ -178,8 +178,6 @@ public class ConnectedComponents implements 
ProgramDescription {
                }
        }
 
-
-
        @Override
        public String getDescription() {
                return "Parameters: <vertices-path> <edges-path> <result-path> 
<max-number-of-iterations>";

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
index 069678a..1e73b92 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
@@ -341,19 +341,19 @@ public class FunctionAnnotation {
                
                return semanticProperties;
        }
-       
-       
+
+
        private static final class 
ImplicitlyForwardingSingleInputSemanticProperties extends 
SingleInputSemanticProperties {
                private static final long serialVersionUID = 1L;
-               
+
                private FieldSet nonForwardedFields;
-               
+
                private 
ImplicitlyForwardingSingleInputSemanticProperties(FieldSet nonForwardedFields) {
                        this.nonForwardedFields = nonForwardedFields;
                        addWrittenFields(nonForwardedFields);
                }
-               
-               
+
+
                /**
                 * Returns the logical position where the given field is 
written to.
                 * In this variant of the semantic properties, all fields are 
assumed implicitly forwarded,
@@ -362,52 +362,81 @@ public class FunctionAnnotation {
                 */
                @Override
                public FieldSet getForwardedField(int sourceField) {
+                       if (isAllFieldsConstant()) {
+                               return new FieldSet(sourceField);
+                       }
+
                        if (this.nonForwardedFields.contains(sourceField)) {
                                return null;
                        } else {
                                return new FieldSet(sourceField);
                        }
                }
-               
+
+               @Override
+               public FieldSet getSourceField(int input, int field) {
+                       if (input != 0) {
+                               throw new IndexOutOfBoundsException();
+                       }
+
+                       if (isAllFieldsConstant()) {
+                               return new FieldSet(field);
+                       }
+
+                       if (this.nonForwardedFields == null) {
+                               return super.getSourceField(input, field);
+                       }
+
+                       if (this.nonForwardedFields.contains(field)) {
+                               return null;
+                       } else {
+                               return new FieldSet(field);
+                       }
+               }
+
                @Override
                public void addForwardedField(int sourceField, int 
destinationField) {
                        throw new UnsupportedOperationException("When defining 
fields as implicitly constant " +
                                        "(such as through the 
ConstantFieldsExcept annotation), you cannot manually add forwarded fields.");
                }
-               
+
                @Override
                public void addForwardedField(int sourceField, FieldSet 
destinationFields) {
                        throw new UnsupportedOperationException("When defining 
fields as implicitly constant " +
                                        "(such as through the 
ConstantFieldsExcept annotation), you cannot manually add forwarded fields.");
                }
-               
+
                @Override
                public void setForwardedField(int sourceField, FieldSet 
destinationFields) {
                        throw new UnsupportedOperationException("When defining 
fields as implicitly constant " +
                                        "(such as through the 
ConstantFieldsExcept annotation), you cannot manually add forwarded fields.");
                }
        }
-       
+
        private static final class 
ImplicitlyForwardingTwoInputSemanticProperties extends 
DualInputSemanticProperties {
                private static final long serialVersionUID = 1L;
-               
+
                private FieldSet nonForwardedFields1;
                private FieldSet nonForwardedFields2;
-               
+
                private ImplicitlyForwardingTwoInputSemanticProperties() {}
-               
-               
+
+
                public void setImplicitlyForwardingFirstExcept(FieldSet 
nonForwardedFields) {
                        this.nonForwardedFields1 = nonForwardedFields;
                }
-               
+
                public void setImplicitlyForwardingSecondExcept(FieldSet 
nonForwardedFields) {
                        this.nonForwardedFields2 = nonForwardedFields;
                }
-               
+
 
                @Override
                public FieldSet getForwardedField1(int sourceField) {
+                       if (isAllFieldsConstant()) {
+                               return new FieldSet(sourceField);
+                       }
+
                        if (this.nonForwardedFields1 == null) {
                                return super.getForwardedField1(sourceField);
                        }
@@ -419,9 +448,13 @@ public class FunctionAnnotation {
                                }
                        }
                }
-               
+
                @Override
                public FieldSet getForwardedField2(int sourceField) {
+                       if (isAllFieldsConstant()) {
+                               return new FieldSet(sourceField);
+                       }
+
                        if (this.nonForwardedFields2 == null) {
                                return super.getForwardedField2(sourceField);
                        }
@@ -433,7 +466,36 @@ public class FunctionAnnotation {
                                }
                        }
                }
-               
+
+               @Override
+               public FieldSet getSourceField(int input, int field) {
+                       if (input != 0 && input != 1) {
+                               throw new IndexOutOfBoundsException();
+                       }
+
+                       if (isAllFieldsConstant()) {
+                               return new FieldSet(field);
+                       }
+
+                       if (this.nonForwardedFields1 == null && 
this.nonForwardedFields2 == null) {
+                               return super.getSourceField(input, field);
+                       }
+
+                       if (input == 0 && this.nonForwardedFields1 != null && 
this.nonForwardedFields1.contains(field)) {
+                               return null;
+                       } else if (input == 0) {
+                               return new FieldSet(field);
+                       }
+
+                       if (input == 1 && this.nonForwardedFields2 != null && 
this.nonForwardedFields2.contains(field)) {
+                               return null;
+                       } else if (input == 1) {
+                               return new FieldSet(field);
+                       }
+
+                       return null;
+               }
+
                @Override
                public void addForwardedField1(int sourceField, int 
destinationField) {
                        if (this.nonForwardedFields1 == null) {
@@ -444,7 +506,7 @@ public class FunctionAnnotation {
                                                "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
                        }
                }
-               
+
                @Override
                public void addForwardedField1(int sourceField, FieldSet 
destinationFields) {
                        if (this.nonForwardedFields1 == null) {
@@ -455,7 +517,7 @@ public class FunctionAnnotation {
                                                "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
                        }
                }
-               
+
                @Override
                public void setForwardedField1(int sourceField, FieldSet 
destinationFields) {
                        if (this.nonForwardedFields1 == null) {
@@ -466,7 +528,7 @@ public class FunctionAnnotation {
                                                "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
                        }
                }
-               
+
                @Override
                public void addForwardedField2(int sourceField, int 
destinationField) {
                        if (this.nonForwardedFields2 == null) {
@@ -477,7 +539,7 @@ public class FunctionAnnotation {
                                                "(such as through the 
ConstantFieldsSecondExcept annotation), you cannot manually add forwarded 
fields.");
                        }
                }
-               
+
                @Override
                public void addForwardedField2(int sourceField, FieldSet 
destinationFields) {
                        if (this.nonForwardedFields2 == null) {
@@ -488,7 +550,7 @@ public class FunctionAnnotation {
                                                "(such as through the 
ConstantFieldsSecondExcept annotation), you cannot manually add forwarded 
fields.");
                        }
                }
-               
+
                @Override
                public void setForwardedField2(int sourceField, FieldSet 
destinationFields) {
                        if (this.nonForwardedFields2 == null) {

Reply via email to