Author: claudio
Date: Mon Jan 23 21:33:04 2012
New Revision: 1234997
URL: http://svn.apache.org/viewvc?rev=1234997&view=rev
Log:
GIRAPH-124: Combiner should return Iterable<M> instead of M or null.
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
Modified: incubator/giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Jan 23 21:33:04 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.1.0 - unreleased
+ GIRAPH-124: Combiner should return Iterable<M> instead of M or
+ null. (claudio)
+
GIRAPH-125: Bug in LongDoubleFloatDoubleVertex.sendMsgToAllEdges().
(humming80 via aching)
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
Mon Jan 23 21:33:04 2012
@@ -59,6 +59,8 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.utils.MemoryUtils;
+import com.google.common.collect.Iterables;
+
/*if[HADOOP_FACEBOOK]
import org.apache.hadoop.ipc.ProtocolSignature;
end[HADOOP_FACEBOOK]*/
@@ -235,10 +237,23 @@ public abstract class BasicRPCCommunicat
}
}
if (combiner != null && entry.getValue().size() > 1) {
- M combinedMsg = combiner.combine(entry.getKey(),
- entry.getValue());
+ Iterable<M> messages = combiner.combine(
+ entry.getKey(), entry.getValue());
+ if (messages == null) {
+ throw new IllegalStateException(
+ "run: Combiner cannot return null");
+ }
+ if (Iterables.size(entry.getValue()) <
+ Iterables.size(messages)) {
+ throw new IllegalStateException(
+ "run: The number of combined " +
+ "messages is required to be <= to " +
+ "number of messages to be combined");
+ }
entry.getValue().clear();
- entry.getValue().add(combinedMsg);
+ for (M msg: messages) {
+ entry.getValue().add(msg);
+ }
}
if (entry.getValue().isEmpty()) {
throw new IllegalStateException(
@@ -347,10 +362,21 @@ public abstract class BasicRPCCommunicat
peerConnection.getRPCProxy();
if (combiner != null) {
- M combinedMsg = combiner.combine(destVertex,
- outMessageList);
- if (combinedMsg != null) {
- proxy.putMsg(destVertex, combinedMsg);
+ Iterable<M> messages = combiner.combine(destVertex,
+ outMessageList);
+ if (messages == null) {
+ throw new IllegalStateException(
+ "run: Combiner cannot return null");
+ }
+ if (Iterables.size(outMessageList) <
+ Iterables.size(messages)) {
+ throw new IllegalStateException(
+ "run: The number of combined messages is " +
+ "required to be <= to the number of " +
+ "messages to be combined");
+ }
+ for (M msg: messages) {
+ proxy.putMsg(destVertex, msg);
}
} else {
proxy.putMsgList(destVertex, outMessageList);
@@ -971,10 +997,24 @@ end[HADOOP_FACEBOOK]*/
for (Entry<I, List<M>> entry : transientInMessages.entrySet()) {
if (combiner != null) {
try {
- M combinedMsg = combiner.combine(entry.getKey(),
- entry.getValue());
- if (combinedMsg != null) {
- putMsg(entry.getKey(), combinedMsg);
+ Iterable<M> messages =
+ combiner.combine(entry.getKey(),
+ entry.getValue());
+ if (messages == null) {
+ throw new IllegalStateException(
+ "prepareSuperstep: Combiner cannot " +
+ "return null");
+ }
+ if (Iterables.size(entry.getValue()) <
+ Iterables.size(messages)) {
+ throw new IllegalStateException(
+ "prepareSuperstep: The number of " +
+ "combined messages is " +
+ "required to be <= to the number of " +
+ "messages to be combined");
+ }
+ for (M msg: messages) {
+ putMsg(entry.getKey(), msg);
}
} catch (IOException e) {
// no actual IO -- should never happen
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
Mon Jan 23 21:33:04 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.graph.VertexCom
import org.apache.hadoop.io.IntWritable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -31,7 +32,7 @@ public class MinimumIntCombiner
extends VertexCombiner<IntWritable, IntWritable> {
@Override
- public IntWritable combine(IntWritable target,
+ public Iterable<IntWritable> combine(IntWritable target,
Iterable<IntWritable> messages) throws IOException {
int minimum = Integer.MAX_VALUE;
for (IntWritable message : messages) {
@@ -39,6 +40,9 @@ public class MinimumIntCombiner
minimum = message.get();
}
}
- return new IntWritable(minimum);
+ List<IntWritable> value = new ArrayList<IntWritable>();
+ value.add(new IntWritable(minimum));
+
+ return value;
}
}
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
Mon Jan 23 21:33:04 2012
@@ -19,6 +19,7 @@
package org.apache.giraph.examples;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
@@ -33,13 +34,15 @@ public class SimpleSumCombiner
extends VertexCombiner<LongWritable, IntWritable> {
@Override
- public IntWritable combine(LongWritable vertexIndex,
- Iterable<IntWritable> messages)
- throws IOException {
+ public Iterable<IntWritable> combine(LongWritable vertexIndex,
+ Iterable<IntWritable> messages) throws IOException {
int sum = 0;
for (IntWritable msg : messages) {
sum += msg.get();
}
- return new IntWritable(sum);
+ List<IntWritable> value = new ArrayList<IntWritable>();
+ value.add(new IntWritable(sum));
+
+ return value;
}
}
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java
Mon Jan 23 21:33:04 2012
@@ -33,15 +33,16 @@ import org.apache.hadoop.io.WritableComp
public abstract class VertexCombiner<I extends WritableComparable,
M extends Writable> {
- /**
- * Combines message values for a particular vertex index.
- *
- * @param vertexIndex Index of the vertex getting these messages
- * @param messages Iterable of the messages to be combined
- * @return Message that is combined from {@link messages} or null if no
- * message it to be sent
- * @throws IOException
- */
- public abstract M combine(I vertexIndex,
- Iterable<M> messages) throws IOException;
+ /**
+ * Combines message values for a particular vertex index.
+ *
+ * @param vertexIndex Index of the vertex getting these messages
+ * @param messages Iterable of the messages to be combined
+ * @return Iterable of the combined messages. The returned value cannot
+ * be null and its size is required to be smaller or equal to
+ * the size of {@link messages}.
+ * @throws IOException
+ */
+ public abstract Iterable<M> combine(I vertexIndex,
+ Iterable<M> messages) throws IOException;
}
Modified:
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java
(original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java
Mon Jan 23 21:33:04 2012
@@ -35,11 +35,13 @@ import org.apache.giraph.graph.GraphMapp
import org.apache.giraph.graph.VertexOutputFormat;
import org.apache.giraph.lib.JsonBase64VertexInputFormat;
import org.apache.giraph.lib.JsonBase64VertexOutputFormat;
+import org.apache.giraph.utils.EmptyIterable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
public class TestVertexTypes
@@ -82,10 +84,9 @@ public class TestVertexTypes
VertexCombiner<LongWritable, FloatWritable> {
@Override
- public FloatWritable combine(LongWritable vertexIndex,
- Iterable<FloatWritable> msgList)
- throws IOException {
- return null;
+ public Iterable<FloatWritable> combine(LongWritable vertexIndex,
+ Iterable<FloatWritable> msgList) throws IOException {
+ return new EmptyIterable<FloatWritable>();
}
}
@@ -96,10 +97,10 @@ public class TestVertexTypes
VertexCombiner<LongWritable, DoubleWritable> {
@Override
- public DoubleWritable combine(LongWritable vertexIndex,
- Iterable<DoubleWritable> msgList)
+ public Iterable<DoubleWritable> combine(LongWritable vertexIndex,
+ Iterable<DoubleWritable> msgList)
throws IOException {
- return null;
+ return new EmptyIterable<DoubleWritable>();
}
}
Modified:
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java?rev=1234997&r1=1234996&r2=1234997&view=diff
==============================================================================
---
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
(original)
+++
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
Mon Jan 23 21:33:04 2012
@@ -22,6 +22,9 @@ import junit.framework.TestCase;
import org.apache.giraph.graph.VertexCombiner;
import org.apache.hadoop.io.IntWritable;
+import com.google.common.collect.Iterables;
+
+import java.io.IOException;
import java.util.Arrays;
public class MinimumIntCombinerTest extends TestCase {
@@ -31,10 +34,11 @@ public class MinimumIntCombinerTest exte
VertexCombiner<IntWritable, IntWritable> combiner =
new MinimumIntCombiner();
- IntWritable result = combiner.combine(new IntWritable(1),
Arrays.asList(
+ Iterable<IntWritable> result = combiner.combine(
+ new IntWritable(1), Arrays.asList(
new IntWritable(39947466), new IntWritable(199),
new IntWritable(19998888), new IntWritable(42)));
-
- assertEquals(42, result.get());
+ assertTrue(result.iterator().hasNext());
+ assertEquals(42, result.iterator().next().get());
}
}