Author: edwardyoon
Date: Tue Sep 8 01:53:53 2015
New Revision: 1701715
URL: http://svn.apache.org/r1701715
Log:
HAMA-973: Fix FT bugs
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
Tue Sep 8 01:53:53 2015
@@ -390,16 +390,26 @@ public class AsyncRcvdMsgCheckpointImpl<
Path path = new Path(checkpointPath(superstepProgress));
FSDataInputStream in = this.fs.open(path);
BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+
try {
for (int i = 0; i < numMessages; ++i) {
String className = in.readUTF();
- @SuppressWarnings("unchecked")
- M message = (M) ReflectionUtils.newInstance(
- Class.forName(className), conf);
- message.readFields(in);
- bundle.addMessage(message);
+ if (className.equals(BSPMessageBundle.class.getCanonicalName()))
{
+ BSPMessageBundle<M> tmp = new BSPMessageBundle<M>();
+ tmp.readFields(in);
+ messenger.loopBackBundle(tmp);
+ } else {
+ @SuppressWarnings("unchecked")
+ M message = (M) ReflectionUtils.newInstance(
+ Class.forName(className), conf);
+ message.readFields(in);
+ bundle.addMessage(message);
+ }
+ }
+
+ if (bundle.size() > 0) {
+ messenger.loopBackBundle(bundle);
}
- messenger.loopBackBundle(bundle);
} catch (EOFException e) {
LOG.error("Error recovering from checkpointing", e);
throw new IOException(e);
@@ -514,6 +524,7 @@ public class AsyncRcvdMsgCheckpointImpl<
+ checkpointedPath, ioe);
}
}
+
try {
++checkpointMessageCount;
checkpointStream.writeUTF(message.getClass().getCanonicalName());
@@ -537,6 +548,49 @@ public class AsyncRcvdMsgCheckpointImpl<
}
+ @Override
+ public void onBundleReceived(BSPMessageBundle<M> bundle) {
+ String checkpointedPath = null;
+
+ if (bundle == null) {
+ LOG.error("bundle is found to be null");
+ }
+
+ synchronized (this) {
+ if (checkpointState) {
+ if (this.checkpointStream == null) {
+ checkpointedPath = checkpointPath(peer.getSuperstepCount() + 1);
+ try {
+ LOG.info("Creating path " + checkpointedPath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating path " + checkpointedPath);
+ }
+ checkpointStream = this.fs.create(new Path(checkpointedPath));
+ } catch (IOException ioe) {
+ LOG.error("Fail checkpointing messages to " + checkpointedPath,
+ ioe);
+ throw new RuntimeException("Failed opening HDFS file "
+ + checkpointedPath, ioe);
+ }
+ }
+
+ try {
+ ++checkpointMessageCount;
+ checkpointStream.writeUTF(bundle.getClass().getCanonicalName());
+ bundle.write(checkpointStream);
+ } catch (IOException ioe) {
+ LOG.error("Fail checkpointing messages to " + checkpointedPath,
ioe);
+ throw new RuntimeException("Failed writing to HDFS file "
+ + checkpointedPath, ioe);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("message count = " + checkpointMessageCount);
+ }
+ }
+ }
+ }
+
}
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Tue Sep 8 01:53:53 2015
@@ -129,7 +129,7 @@ public abstract class AbstractMessageMan
public final int getNumCurrentMessages() {
return localQueue.size();
}
-
+
public void clearIncomingMessages() {
localQueue.clear();
}
@@ -145,13 +145,13 @@ public abstract class AbstractMessageMan
if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
&& localQueue.size() > 0) {
- // To reduce the number of element additions
- if (localQueue.size() > localQueueForNextIteration.size()) {
- localQueue.addAll(localQueueForNextIteration);
- } else {
- localQueueForNextIteration.addAll(localQueue);
- localQueue = localQueueForNextIteration.getMessageQueue();
- }
+ // To reduce the number of element additions
+ if (localQueue.size() > localQueueForNextIteration.size()) {
+ localQueue.addAll(localQueueForNextIteration);
+ } else {
+ localQueueForNextIteration.addAll(localQueue);
+ localQueue = localQueueForNextIteration.getMessageQueue();
+ }
} else {
if (localQueue != null) {
@@ -178,7 +178,7 @@ public abstract class AbstractMessageMan
notifySentMessage(peerName, msg);
}
-
+
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#getMessageIterator()
@@ -239,6 +239,13 @@ public abstract class AbstractMessageMan
}
}
+ private void notifyReceivedMessage(BSPMessageBundle<M> bundle)
+ throws IOException {
+ for (MessageEventListener<M> aMessageListenerQueue :
this.messageListenerQueue) {
+ aMessageListenerQueue.onBundleReceived(bundle);
+ }
+ }
+
private void notifyInit() {
for (MessageEventListener<M> aMessageListenerQueue :
this.messageListenerQueue) {
aMessageListenerQueue.onInitialized();
@@ -261,11 +268,10 @@ public abstract class AbstractMessageMan
@Override
public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
bundle.size());
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+ bundle.size());
this.localQueueForNextIteration.addBundle(bundle);
-
- // TODO checkpoint bundle itself instead of unpacked messages. --
edwardyoon
- // notifyReceivedMessage(bundle);
+ notifyReceivedMessage(bundle);
}
@SuppressWarnings("unchecked")
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
Tue Sep 8 01:53:53 2015
@@ -17,7 +17,10 @@
*/
package org.apache.hama.bsp.message;
-public interface MessageEventListener<M> {
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
+
+public interface MessageEventListener<M extends Writable> {
/**
*
@@ -48,6 +51,8 @@ public interface MessageEventListener<M>
* @param message The message received.
*/
void onMessageReceived(final M message);
+
+ void onBundleReceived(final BSPMessageBundle<M> bundle);
/**
* The function to handle the event when the queue is closed.
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Tue
Sep 8 01:53:53 2015
@@ -38,6 +38,8 @@ public class TestPersistQueue extends Te
public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
public void testMemoryQueue() throws Exception {
+ BSPMessageBundle<IntWritable> x = new BSPMessageBundle<IntWritable>();
+ System.out.println(x.getClass().getCanonicalName() + ", " +
BSPMessageBundle.class.getCanonicalName());
BSPJob bsp = getNewJobConf();
bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
"org.apache.hama.bsp.message.queue.MemoryQueue");
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
(original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
Tue Sep 8 01:53:53 2015
@@ -55,7 +55,7 @@ public class RandBench {
byte[] dummyData = new byte[sizeOfMsg];
String[] peers = peer.getAllPeerNames();
- for (int i = 0; i < nSupersteps; i++) {
+ for (int i = (int) peer.getSuperstepCount(); i < nSupersteps; i++) {
for (int j = 0; j < nCommunications; j++) {
String tPeer = peers[r.nextInt(peers.length)];
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Tue Sep 8 01:53:53 2015
@@ -145,10 +145,12 @@ public final class GraphJobRunner<V exte
LOG.info("Total time spent for broadcasting global vertex count: "
+ (System.currentTimeMillis() - startTime) + " ms");
- startTime = System.currentTimeMillis();
- doInitialSuperstep(peer);
- LOG.info("Total time spent for initial superstep: "
- + (System.currentTimeMillis() - startTime) + " ms");
+ if (peer.getSuperstepCount() == 2) {
+ startTime = System.currentTimeMillis();
+ doInitialSuperstep(peer);
+ LOG.info("Total time spent for initial superstep: "
+ + (System.currentTimeMillis() - startTime) + " ms");
+ }
}
@Override
@@ -760,10 +762,10 @@ public final class GraphJobRunner<V exte
public Iterable<Writable> getIterableMessages(final byte[] valuesBytes,
final int numOfValues) {
-
+
return new Iterable<Writable>() {
DataInputStream dis;
-
+
@Override
public Iterator<Writable> iterator() {
if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
@@ -771,7 +773,7 @@ public final class GraphJobRunner<V exte
} else {
dis = new DataInputStream(new
UnsafeByteArrayInputStream(valuesBytes));
}
-
+
return new Iterator<Writable>() {
int index = 0;