Author: aching Date: Thu Dec 22 06:53:44 2011 New Revision: 1222071 URL: http://svn.apache.org/viewvc?rev=1222071&view=rev Log: GIRAPH-112: Use elements() properly in LongDoubleFloatDoubleVertex. (aching)
Modified: incubator/giraph/trunk/CHANGELOG incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Modified: incubator/giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1222071&r1=1222070&r2=1222071&view=diff ============================================================================== --- incubator/giraph/trunk/CHANGELOG (original) +++ incubator/giraph/trunk/CHANGELOG Thu Dec 22 06:53:44 2011 @@ -2,6 +2,9 @@ Giraph Change Log Release 0.70.0 - unreleased + GIRAPH-112: Use elements() properly in LongDoubleFloatDoubleVertex. + (aching) + GIRAPH-114: Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor. (ssc via aching) Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1222071&r1=1222070&r2=1222071&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Thu Dec 22 06:53:44 2011 @@ -22,6 +22,9 @@ import org.apache.hadoop.io.DoubleWritab import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.log4j.Logger; +import org.apache.mahout.math.function.DoubleProcedure; +import org.apache.mahout.math.function.LongFloatProcedure; +import org.apache.mahout.math.function.LongProcedure; import org.apache.mahout.math.list.DoubleArrayList; import org.apache.mahout.math.map.OpenLongFloatHashMap; @@ -33,38 +36,45 @@ import java.util.List; import java.util.Map; public abstract class LongDoubleFloatDoubleVertex extends - MutableVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> { + MutableVertex<LongWritable, DoubleWritable, FloatWritable, + DoubleWritable> { /** Class logger */ - private static final Logger LOG = Logger.getLogger(LongDoubleFloatDoubleVertex.class); + private static final Logger LOG = + Logger.getLogger(LongDoubleFloatDoubleVertex.class); private long vertexId; private double vertexValue; - private OpenLongFloatHashMap verticesWithEdgeValues = new OpenLongFloatHashMap(); + private OpenLongFloatHashMap verticesWithEdgeValues = + new OpenLongFloatHashMap(); private DoubleArrayList messageList = new DoubleArrayList(); @Override public void initialize(LongWritable vertexIdW, DoubleWritable vertexValueW, - Map<LongWritable, FloatWritable> edgesW, List<DoubleWritable> messagesW) { - if (vertexIdW != null ) { - vertexId = vertexIdW.get(); - } - if (vertexValueW != null) { - vertexValue = vertexValueW.get(); - } - if (edgesW != null) { - for(Map.Entry<LongWritable, FloatWritable> entry : edgesW.entrySet()) { - verticesWithEdgeValues.put(entry.getKey().get(), entry.getValue().get()); - } - } - if (messagesW != null) { - for(DoubleWritable m : messagesW) { - messageList.add(m.get()); + Map<LongWritable, FloatWritable> edgesW, + List<DoubleWritable> messagesW) { + if (vertexIdW != null ) { + vertexId = vertexIdW.get(); + } + if (vertexValueW != null) { + vertexValue = vertexValueW.get(); + } + if (edgesW != null) { + for (Map.Entry<LongWritable, FloatWritable> entry : + edgesW.entrySet()) { + verticesWithEdgeValues.put(entry.getKey().get(), + entry.getValue().get()); + } + } + if (messagesW != null) { + for(DoubleWritable m : messagesW) { + messageList.add(m.get()); + } } - } } @Override - public final boolean addEdge(LongWritable targetId, FloatWritable edgeValue) { + public final boolean addEdge(LongWritable targetId, + FloatWritable edgeValue) { if (verticesWithEdgeValues.put(targetId.get(), edgeValue.get())) { if (LOG.isDebugEnabled()) { LOG.debug("addEdge: Vertex=" + vertexId + @@ -96,7 +106,8 @@ public abstract class LongDoubleFloatDou @Override public final LongWritable getVertexId() { - return new LongWritable(vertexId); // TODO: possibly not make new objects every time? + // TODO: possibly not make new objects every time? + return new LongWritable(vertexId); } @Override @@ -119,16 +130,22 @@ public abstract class LongDoubleFloatDou } @Override - public final void sendMsgToAllEdges(DoubleWritable msg) { + public final void sendMsgToAllEdges(final DoubleWritable msg) { if (msg == null) { throw new IllegalArgumentException( - "sendMsgToAllEdges: Cannot send null message to all edges"); - } - LongWritable destVertex = new LongWritable(); - for (long destVertexId : verticesWithEdgeValues.keys().elements()) { - destVertex.set(destVertexId); - sendMsg(destVertex, msg); + "sendMsgToAllEdges: Cannot send null message to all edges"); } + final LongWritable destVertex = new LongWritable(); + final MutableVertex<LongWritable, DoubleWritable, FloatWritable, + DoubleWritable> vertex = this; + verticesWithEdgeValues.forEachKey(new LongProcedure() { + @Override + public boolean apply(long destVertexId) { + destVertex.set(destVertexId); + vertex.sendMsg(destVertex, msg); + return true; + } + }); } @Override @@ -144,10 +161,11 @@ public abstract class LongDoubleFloatDou @Override public Iterator<LongWritable> iterator() { final long[] destVertices = verticesWithEdgeValues.keys().elements(); + final int destVerticesSize = verticesWithEdgeValues.size(); return new Iterator<LongWritable>() { int offset = 0; @Override public boolean hasNext() { - return offset < destVertices.length; + return offset < destVerticesSize; } @Override public LongWritable next() { @@ -156,7 +174,7 @@ public abstract class LongDoubleFloatDou @Override public void remove() { throw new UnsupportedOperationException( - "Mutation disallowed for edge list via iterator"); + "Mutation disallowed for edge list via iterator"); } }; } @@ -183,7 +201,8 @@ public abstract class LongDoubleFloatDou } @Override - public void addVertexRequest(MutableVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> vertex) + public void addVertexRequest(MutableVertex<LongWritable, DoubleWritable, + FloatWritable, DoubleWritable> vertex) throws IOException { getGraphState().getWorkerCommunications().addVertexReq(vertex); } @@ -197,12 +216,14 @@ public abstract class LongDoubleFloatDou public void addEdgeRequest(LongWritable vertexIndex, Edge<LongWritable, FloatWritable> edge) throws IOException { - getGraphState().getWorkerCommunications().addEdgeReq(vertexIndex, edge); + getGraphState().getWorkerCommunications().addEdgeReq(vertexIndex, + edge); } @Override public void removeEdgeRequest(LongWritable sourceVertexId, - LongWritable destVertexId) throws IOException { + LongWritable destVertexId) + throws IOException { getGraphState().getWorkerCommunications().removeEdgeReq( sourceVertexId, destVertexId); } @@ -225,19 +246,36 @@ public abstract class LongDoubleFloatDou } @Override - public final void write(DataOutput out) throws IOException { + public final void write(final DataOutput out) throws IOException { out.writeLong(vertexId); out.writeDouble(vertexValue); out.writeLong(verticesWithEdgeValues.size()); - for(long destVertexId : verticesWithEdgeValues.keys().elements()) { - float edgeValue = verticesWithEdgeValues.get(destVertexId); - out.writeLong(destVertexId); - out.writeFloat(edgeValue); - } + verticesWithEdgeValues.forEachPair(new LongFloatProcedure() { + @Override + public boolean apply(long destVertexId, float edgeValue) { + try { + out.writeLong(destVertexId); + out.writeFloat(edgeValue); + } catch (IOException e) { + throw new IllegalStateException( + "apply: IOException when not allowed", e); + } + return true; + } + }); out.writeLong(messageList.size()); - for(double msg : messageList.elements()) { - out.writeDouble(msg); - } + messageList.forEach(new DoubleProcedure() { + @Override + public boolean apply(double message) { + try { + out.writeDouble(message); + } catch (IOException e) { + throw new IllegalStateException( + "apply: IOException when not allowed", e); + } + return true; + } + }); out.writeBoolean(halt); } @@ -271,36 +309,35 @@ public abstract class LongDoubleFloatDou private final DoubleArrayList elementList; - public UnmodifiableDoubleWritableIterable(DoubleArrayList elementList) { + public UnmodifiableDoubleWritableIterable( + DoubleArrayList elementList) { this.elementList = elementList; } @Override public Iterator<DoubleWritable> iterator() { return new UnmodifiableDoubleWritableIterator( - elementList.elements()); + elementList); } } private class UnmodifiableDoubleWritableIterator extends UnmodifiableIterator<DoubleWritable> { + private final DoubleArrayList elementList; + private int offset = 0; - private final double[] elements; - private int offset; - - UnmodifiableDoubleWritableIterator(double[] elements) { - offset = 0; - this.elements = elements; + UnmodifiableDoubleWritableIterator(DoubleArrayList elementList) { + this.elementList = elementList; } @Override public boolean hasNext() { - return offset < elements.length; + return offset < elementList.size(); } @Override public DoubleWritable next() { - return new DoubleWritable(elements[offset++]); + return new DoubleWritable(elementList.get(offset++)); } } }