Repository: flink Updated Branches: refs/heads/master 82c420022 -> 805ea6943
http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java index e99cac7..7b90c8e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java @@ -144,7 +144,7 @@ public class Ordering { } for (int i = 0; i < this.indexes.size(); i++) { - if (this.indexes.get(i) != otherOrdering.indexes.get(i)) { + if (this.indexes.get(i).intValue() != otherOrdering.indexes.get(i).intValue()) { return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java index ba801ec..da99018 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java @@ -27,7 +27,8 @@ import org.apache.flink.api.common.operators.util.FieldSet; * Container for the semantic properties associated to an operator. */ public abstract class SemanticProperties implements Serializable { - + private boolean allFieldsConstant; + private static final long serialVersionUID = 1L; /** Set of fields that are written in the destination record(s).*/ @@ -47,7 +48,19 @@ public abstract class SemanticProperties implements Serializable { this.writtenFields = this.writtenFields.addFields(writtenFields); } } - + + public void setAllFieldsConstant(boolean constant) { + this.allFieldsConstant = constant; + } + + public boolean isAllFieldsConstant() { + return this.allFieldsConstant; + } + + public abstract FieldSet getForwardFields(int input, int field); + + public abstract FieldSet getSourceField(int input, int field); + /** * Sets the field(s) that are written in the destination record(s). * http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java index 77ed1bc..abe995b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java @@ -36,7 +36,41 @@ public class SingleInputSemanticProperties extends SemanticProperties { /** Set of fields that are read in the source record(s).*/ private FieldSet readFields; - + @Override + public FieldSet getForwardFields(int input, int field) { + if (input != 0) { + throw new IndexOutOfBoundsException(); + } + return this.getForwardedField(field); + } + + @Override + public FieldSet getSourceField(int input, int field) { + if (input != 0) { + throw new IndexOutOfBoundsException(); + } + + if (isAllFieldsConstant()) { + return new FieldSet(field); + } + + return this.forwardedFrom(field); + } + + public FieldSet forwardedFrom(int dest) { + FieldSet fs = null; + for (Map.Entry<Integer, FieldSet> entry : forwardedFields.entrySet()) { + if (entry.getValue().contains(dest)) { + if (fs == null) { + fs = new FieldSet(); + } + + fs = fs.addField(entry.getKey()); + } + } + return fs; + } + public SingleInputSemanticProperties() { init(); } @@ -95,6 +129,10 @@ public class SingleInputSemanticProperties extends SemanticProperties { * @return the destination fields, or null if they do not exist */ public FieldSet getForwardedField(int sourceField) { + if (isAllFieldsConstant()) { + return new FieldSet(sourceField); + } + return this.forwardedFields.get(sourceField); } @@ -145,7 +183,12 @@ public class SingleInputSemanticProperties extends SemanticProperties { (forwardedFields == null || forwardedFields.isEmpty()) && (readFields == null || readFields.size() == 0); } - + + @Override + public String toString() { + return "SISP(" + this.forwardedFields + ")"; + } + private void init() { this.forwardedFields = new HashMap<Integer,FieldSet>(); this.readFields = null; http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java index cb99e23..cb3b8d5 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java @@ -155,8 +155,8 @@ public class ConnectedComponents implements ProgramDescription { * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function * produces a (Target-vertex-ID, Component-ID) pair. */ - @ConstantFieldsFirst("1 -> 0") - @ConstantFieldsSecond("1 -> 1") + @ConstantFieldsFirst("1 -> 1") + @ConstantFieldsSecond("1 -> 0") public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override @@ -178,8 +178,6 @@ public class ConnectedComponents implements ProgramDescription { } } - - @Override public String getDescription() { return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>"; http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java index 069678a..1e73b92 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java @@ -341,19 +341,19 @@ public class FunctionAnnotation { return semanticProperties; } - - + + private static final class ImplicitlyForwardingSingleInputSemanticProperties extends SingleInputSemanticProperties { private static final long serialVersionUID = 1L; - + private FieldSet nonForwardedFields; - + private ImplicitlyForwardingSingleInputSemanticProperties(FieldSet nonForwardedFields) { this.nonForwardedFields = nonForwardedFields; addWrittenFields(nonForwardedFields); } - - + + /** * Returns the logical position where the given field is written to. * In this variant of the semantic properties, all fields are assumed implicitly forwarded, @@ -362,52 +362,81 @@ public class FunctionAnnotation { */ @Override public FieldSet getForwardedField(int sourceField) { + if (isAllFieldsConstant()) { + return new FieldSet(sourceField); + } + if (this.nonForwardedFields.contains(sourceField)) { return null; } else { return new FieldSet(sourceField); } } - + + @Override + public FieldSet getSourceField(int input, int field) { + if (input != 0) { + throw new IndexOutOfBoundsException(); + } + + if (isAllFieldsConstant()) { + return new FieldSet(field); + } + + if (this.nonForwardedFields == null) { + return super.getSourceField(input, field); + } + + if (this.nonForwardedFields.contains(field)) { + return null; + } else { + return new FieldSet(field); + } + } + @Override public void addForwardedField(int sourceField, int destinationField) { throw new UnsupportedOperationException("When defining fields as implicitly constant " + "(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded fields."); } - + @Override public void addForwardedField(int sourceField, FieldSet destinationFields) { throw new UnsupportedOperationException("When defining fields as implicitly constant " + "(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded fields."); } - + @Override public void setForwardedField(int sourceField, FieldSet destinationFields) { throw new UnsupportedOperationException("When defining fields as implicitly constant " + "(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded fields."); } } - + private static final class ImplicitlyForwardingTwoInputSemanticProperties extends DualInputSemanticProperties { private static final long serialVersionUID = 1L; - + private FieldSet nonForwardedFields1; private FieldSet nonForwardedFields2; - + private ImplicitlyForwardingTwoInputSemanticProperties() {} - - + + public void setImplicitlyForwardingFirstExcept(FieldSet nonForwardedFields) { this.nonForwardedFields1 = nonForwardedFields; } - + public void setImplicitlyForwardingSecondExcept(FieldSet nonForwardedFields) { this.nonForwardedFields2 = nonForwardedFields; } - + @Override public FieldSet getForwardedField1(int sourceField) { + if (isAllFieldsConstant()) { + return new FieldSet(sourceField); + } + if (this.nonForwardedFields1 == null) { return super.getForwardedField1(sourceField); } @@ -419,9 +448,13 @@ public class FunctionAnnotation { } } } - + @Override public FieldSet getForwardedField2(int sourceField) { + if (isAllFieldsConstant()) { + return new FieldSet(sourceField); + } + if (this.nonForwardedFields2 == null) { return super.getForwardedField2(sourceField); } @@ -433,7 +466,36 @@ public class FunctionAnnotation { } } } - + + @Override + public FieldSet getSourceField(int input, int field) { + if (input != 0 && input != 1) { + throw new IndexOutOfBoundsException(); + } + + if (isAllFieldsConstant()) { + return new FieldSet(field); + } + + if (this.nonForwardedFields1 == null && this.nonForwardedFields2 == null) { + return super.getSourceField(input, field); + } + + if (input == 0 && this.nonForwardedFields1 != null && this.nonForwardedFields1.contains(field)) { + return null; + } else if (input == 0) { + return new FieldSet(field); + } + + if (input == 1 && this.nonForwardedFields2 != null && this.nonForwardedFields2.contains(field)) { + return null; + } else if (input == 1) { + return new FieldSet(field); + } + + return null; + } + @Override public void addForwardedField1(int sourceField, int destinationField) { if (this.nonForwardedFields1 == null) { @@ -444,7 +506,7 @@ public class FunctionAnnotation { "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); } } - + @Override public void addForwardedField1(int sourceField, FieldSet destinationFields) { if (this.nonForwardedFields1 == null) { @@ -455,7 +517,7 @@ public class FunctionAnnotation { "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); } } - + @Override public void setForwardedField1(int sourceField, FieldSet destinationFields) { if (this.nonForwardedFields1 == null) { @@ -466,7 +528,7 @@ public class FunctionAnnotation { "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); } } - + @Override public void addForwardedField2(int sourceField, int destinationField) { if (this.nonForwardedFields2 == null) { @@ -477,7 +539,7 @@ public class FunctionAnnotation { "(such as through the ConstantFieldsSecondExcept annotation), you cannot manually add forwarded fields."); } } - + @Override public void addForwardedField2(int sourceField, FieldSet destinationFields) { if (this.nonForwardedFields2 == null) { @@ -488,7 +550,7 @@ public class FunctionAnnotation { "(such as through the ConstantFieldsSecondExcept annotation), you cannot manually add forwarded fields."); } } - + @Override public void setForwardedField2(int sourceField, FieldSet destinationFields) { if (this.nonForwardedFields2 == null) {
