Author: edwardyoon
Date: Tue May 21 08:50:18 2013
New Revision: 1484701
URL: http://svn.apache.org/r1484701
Log:
HAMA-758: Send message to non-exist vertex makes the job fail (MaoYuan Xian via
edwardyoon)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1484701&r1=1484700&r2=1484701&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue May 21 08:50:18 2013
@@ -14,6 +14,7 @@ Release 0.7 (unreleased changes)
IMPROVEMENTS
+ HAMA-758: Send message to non-exist vertex makes the job fail (MaoYuan Xian
via edwardyoon)
HAMA-757: The partitioning job output should be un-splitable (MaoYuan Xian
via edwardyoon)
HAMA-754: PartitioningRunner should write raw records to partition files
(edwardyoon)
HAMA-707: BSPMessageBundle should be able to encapsulate messages
serialized in ByteBuffer (surajsmenon)
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1484701&r1=1484700&r2=1484701&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Tue May 21 08:50:18 2013
@@ -279,11 +279,22 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
@SuppressWarnings("unchecked")
int comparision = firstMessageId.compareTo(vertex.getVertexID());
- if (comparision < 0) {
- throw new IllegalArgumentException(
+ if (conf.getBoolean("hama.check.missing.vertex", true)) {
+ if (comparision < 0) {
+ throw new IllegalArgumentException(
"Messages must never be behind the vertex in ID! Current Message ID:
"
+ firstMessageId + " vs. " + vertex.getVertexID());
- } else if (comparision == 0) {
+ }
+ } else {
+ while (comparision < 0) {
+ VertexMessageIterable<V, M> messageIterable = new
VertexMessageIterable<V, M>(currentMessage,
+ firstMessageId, peer);
+ currentMessage = messageIterable.getOverflowMessage();
+ firstMessageId = (V)currentMessage.getVertexId();
+ comparision = firstMessageId.compareTo(vertex.getVertexID());
+ }
+ }
+ if (comparision == 0) {
// vertex id matches with the vertex, return an iterator with newest
// message
return new VertexMessageIterable<V, M>(currentMessage,