[FLINK-4734] [gelly] Remove use of Tuple setField for fixed position This closes #2590
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1577e898 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1577e898 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1577e898 Branch: refs/heads/master Commit: 1577e898d97970ad815b2e431a9cb980bd52b660 Parents: bb34133 Author: Greg Hogan <[email protected]> Authored: Tue Oct 4 09:50:42 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Wed Oct 5 12:40:06 2016 -0400 ---------------------------------------------------------------------- .../src/main/java/org/apache/flink/graph/Graph.java | 4 ++-- .../org/apache/flink/graph/pregel/ComputeFunction.java | 12 ++++++------ .../org/apache/flink/graph/pregel/MessageCombiner.java | 4 ++-- .../flink/graph/pregel/VertexCentricIteration.java | 12 ++++++------ .../flink/graph/spargel/ScatterGatherIteration.java | 4 ++-- 5 files changed, 18 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1577e898/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 02d1eeb..7854c54 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -2206,7 +2206,7 @@ public class Graph<K, VV, EV> { @Override public Tuple2<K, VV> reduce(Tuple2<K, VV> first, Tuple2<K, VV> second) throws Exception { - first.setField(function.reduceNeighbors(first.f1, second.f1), 1); + first.f1 = function.reduceNeighbors(first.f1, second.f1); return first; } } @@ -2256,7 +2256,7 @@ public class Graph<K, VV, EV> { @Override public Tuple2<K, EV> reduce(Tuple2<K, EV> first, Tuple2<K, EV> second) throws Exception { - first.setField(function.reduceEdges(first.f1, second.f1), 1); + first.f1 = function.reduceEdges(first.f1, second.f1); return first; } } http://git-wip-us.apache.org/repos/asf/flink/blob/1577e898/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java index 08c15e9..af25377 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java @@ -98,10 +98,10 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl */ public final void sendMessageToAllNeighbors(Message m) { verifyEdgeUsage(); - outMsg.setField(m, 1); + outMsg.f1 = m; while (edges.hasNext()) { Tuple next = edges.next(); - outMsg.setField(next.getField(1), 0); + outMsg.f0 = next.getField(1); out.collect(Either.Right(outMsg)); } } @@ -115,8 +115,8 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl */ public final void sendMessageTo(K target, Message m) { - outMsg.setField(target, 0); - outMsg.setField(m, 1); + outMsg.f0 = target; + outMsg.f1 = m; out.collect(Either.Right(outMsg)); } @@ -134,7 +134,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl } setNewVertexValueCalled = true; - outVertex.setField(newValue, 1); + outVertex.f1 = newValue; out.collect(Either.Left(outVertex)); } @@ -213,7 +213,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl void set(K vertexId, Iterator<Edge<K, EV>> edges, Collector<Either<Vertex<K, VV>, Tuple2<K, Message>>> out) { - this.outVertex.setField(vertexId, 0); + this.outVertex.f0 = vertexId; this.edges = edges; this.out = (Collector<Either<?, ?>>) (Collector<?>) out; this.edgesUsed = false; http://git-wip-us.apache.org/repos/asf/flink/blob/1577e898/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java index 9398d8d..6e51a3a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java @@ -42,7 +42,7 @@ public abstract class MessageCombiner<K, Message> implements Serializable { void set(K target, Collector<Tuple2<K, Either<NullValue, Message>>> collector) { this.out = collector; this.outValue = new Tuple2<>(); - outValue.setField(target, 0); + outValue.f0 = target; } /** @@ -62,7 +62,7 @@ public abstract class MessageCombiner<K, Message> implements Serializable { * @throws Exception */ public final void sendCombinedMessage(Message combinedMessage) { - outValue.setField(Either.Right(combinedMessage), 1); + outValue.f1 = Either.Right(combinedMessage); out.collect(outValue); } } http://git-wip-us.apache.org/repos/asf/flink/blob/1577e898/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java index ebf2e8d..5b2502e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java @@ -299,11 +299,11 @@ public class VertexCentricIteration<K, VV, EV, Message> public void open(Configuration parameters) { outTuple = new Tuple2<>(); nullMessage = Either.Left(NullValue.getInstance()); - outTuple.setField(nullMessage, 1); + outTuple.f1 = nullMessage; } public Tuple2<K, Either<NullValue, Message>> map(Vertex<K, VV> vertex) { - outTuple.setField(vertex.getId(), 0); + outTuple.f0 = vertex.getId(); return outTuple; } } @@ -474,8 +474,8 @@ public class VertexCentricIteration<K, VV, EV, Message> public Tuple2<Vertex<K, VV>, Either<NullValue, Message>> join( Vertex<K, VV> vertex, Tuple2<K, Either<NullValue, Message>> message) { - outTuple.setField(vertex, 0); - outTuple.setField(message.f1, 1); + outTuple.f0 = vertex; + outTuple.f1 = message.f1; return outTuple; } } @@ -504,8 +504,8 @@ public class VertexCentricIteration<K, VV, EV, Message> if (value.isRight()) { Tuple2<K, Message> message = value.right(); - outTuple.setField(message.f0, 0); - outTuple.setField(Either.Right(message.f1), 1); + outTuple.f0 = message.f0; + outTuple.f1 = Either.Right(message.f1); out.collect(outTuple); } } http://git-wip-us.apache.org/repos/asf/flink/blob/1577e898/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java index 8049932..a378ab1 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java @@ -310,8 +310,8 @@ public class ScatterGatherIteration<K, VV, Message, EV> if (stateIter.hasNext()) { Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next(); - nextVertex.setField(vertexWithDegrees.f0, 0); - nextVertex.setField(vertexWithDegrees.f1.f0, 1); + nextVertex.f0 = vertexWithDegrees.f0; + nextVertex.f1 = vertexWithDegrees.f1.f0; scatterFunction.setInDegree(vertexWithDegrees.f1.f1.getValue()); scatterFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
