Author: tjungblut
Date: Wed Sep 12 16:27:45 2012
New Revision: 1384022
URL: http://svn.apache.org/viewvc?rev=1384022&view=rev
Log:
Several small fixes
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1384022&r1=1384021&r2=1384022&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Wed Sep 12
16:27:45 2012
@@ -27,10 +27,9 @@ import org.apache.hama.bsp.sync.SyncExce
import org.apache.hama.util.KeyValuePair;
/**
- * BSP communication interface.
- * Reads key-value inputs, with K1 typed keys and V1 typed values.
- * Collects key-value outputs, with k2 typed keys and V2 typed values.
- * Exchange messages with other {@link BSPPeer}s via messages of type M.
+ * BSP communication interface. Reads key-value inputs, with K1 typed keys and
+ * V1 typed values. Collects key-value outputs, with k2 typed keys and V2 typed
+ * values. Exchange messages with other {@link BSPPeer}s via messages of type
M.
*/
public interface BSPPeer<K1, V1, K2, V2, M extends Writable> extends Constants
{
@@ -186,10 +185,15 @@ public interface BSPPeer<K1, V1, K2, V2,
* @return the size of assigned split
*/
public long getSplitSize();
-
+
/**
* @return the current position of the file read pointer
* @throws IOException
*/
public long getPos() throws IOException;
+
+ /**
+ * @return the task id of this task.
+ */
+ public TaskAttemptID getTaskId();
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1384022&r1=1384021&r2=1384022&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Wed Sep
12 16:27:45 2012
@@ -96,7 +96,7 @@ public final class BSPPeerImpl<K1, V1, K
private FaultTolerantPeerService<M> faultToleranceService;
private long splitSize = 0L;
-
+
/**
* Protected default constructor for LocalBSPRunner.
*/
@@ -232,6 +232,10 @@ public final class BSPPeerImpl<K1, V1, K
}
}
}
+
+ // init the internal state
+ initialize();
+
doFirstSync(superstep);
if (LOG.isDebugEnabled()) {
@@ -281,7 +285,7 @@ public final class BSPPeerImpl<K1, V1, K
}
@SuppressWarnings("unchecked")
- public final void initialize() throws Exception {
+ private final void initialize() throws Exception {
initInput();
@@ -348,7 +352,7 @@ public final class BSPPeerImpl<K1, V1, K
public long getSplitSize() {
return splitSize;
}
-
+
/**
* @return the position in the input stream.
*/
@@ -356,7 +360,7 @@ public final class BSPPeerImpl<K1, V1, K
public long getPos() throws IOException {
return in.getPos();
}
-
+
public final void initilizeMessaging() throws ClassNotFoundException {
messenger = MessageManagerFactory.getMessageManager(conf);
messenger.init(taskId, this, conf, peerAddress);
@@ -463,7 +467,7 @@ public final class BSPPeerImpl<K1, V1, K
messenger.clearOutgoingQueues();
leaveBarrier();
-
+
incrementCounter(PeerCounter.TIME_IN_SYNC_MS,
(System.currentTimeMillis() - startBarrier));
incrementCounter(PeerCounter.SUPERSTEP_SUM, 1L);
@@ -479,7 +483,7 @@ public final class BSPPeerImpl<K1, V1, K
}
umbilical.statusUpdate(taskId, currentTaskStatus);
-
+
}
private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
@@ -702,4 +706,9 @@ public final class BSPPeerImpl<K1, V1, K
}
}
+ @Override
+ public TaskAttemptID getTaskId() {
+ return taskId;
+ }
+
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1384022&r1=1384021&r2=1384022&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Wed
Sep 12 16:27:45 2012
@@ -169,10 +169,9 @@ public class TestCheckpoint extends Test
superstepCount = 0L;
try {
- fService = (new
AsyncRcvdMsgCheckpointImpl<Text>()).constructPeerFaultTolerance(
- job, (BSPPeer<?, ?, ?, ?, Text>) this,
- (BSPPeerSyncClient) syncClient, null, taskId, superstep, conf,
- messenger);
+ fService = (new AsyncRcvdMsgCheckpointImpl<Text>())
+ .constructPeerFaultTolerance(job, this, syncClient, null, taskId,
+ superstep, conf, messenger);
this.fService.onPeerInitialized(state);
} catch (Exception e) {
e.printStackTrace();
@@ -288,16 +287,19 @@ public class TestCheckpoint extends Test
@Override
public long getSplitSize() {
- // TODO Auto-generated method stub
return 0;
}
@Override
public long getPos() throws IOException {
- // TODO Auto-generated method stub
return 0;
}
+ @Override
+ public TaskAttemptID getTaskId() {
+ return null;
+ }
+
}
public static class TempSyncClient extends BSPPeerSyncClient {
@@ -328,13 +330,12 @@ public class TestCheckpoint extends Test
}
@Override
- public boolean getInformation(String key,
- Writable valueHolder) {
+ public boolean getInformation(String key, Writable valueHolder) {
LOG.info("Getting value for key " + key);
- if(!valueMap.containsKey(key)){
+ if (!valueMap.containsKey(key)) {
return false;
}
- Writable value = valueMap.get(key);
+ Writable value = valueMap.get(key);
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(byteStream);
byte[] data = null;
@@ -440,19 +441,19 @@ public class TestCheckpoint extends Test
}
- private void checkSuperstepMsgCount(PeerSyncClient syncClient,
+ private static void checkSuperstepMsgCount(PeerSyncClient syncClient,
@SuppressWarnings("rawtypes")
BSPPeer bspTask, BSPJob job, long step, long count) {
-
+
ArrayWritable writableVal = new ArrayWritable(LongWritable.class);
-
+
boolean result = syncClient.getInformation(
syncClient.constructKey(job.getJobID(), "checkpoint",
"" + bspTask.getPeerIndex()), writableVal);
-
+
assertTrue(result);
- LongWritable superstepNo = (LongWritable) writableVal.get()[0];
+ LongWritable superstepNo = (LongWritable) writableVal.get()[0];
LongWritable msgCount = (LongWritable) writableVal.get()[1];
assertEquals(step, superstepNo.get());
@@ -477,8 +478,7 @@ public class TestCheckpoint extends Test
TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
TestMessageManager messenger = new TestMessageManager();
- PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
- .getPeerSyncClient(config);
+ PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config);
@SuppressWarnings("rawtypes")
BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
(BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
@@ -489,15 +489,15 @@ public class TestCheckpoint extends Test
int port = BSPNetUtils.getFreePort(12502);
LOG.info("Got port = " + port);
- boolean result = syncClient.getInformation(
- syncClient.constructKey(job.getJobID(), "checkpoint",
- "" + bspTask.getPeerIndex()), new
ArrayWritable(LongWritable.class));
+ boolean result = syncClient
+ .getInformation(syncClient.constructKey(job.getJobID(), "checkpoint",
+ "" + bspTask.getPeerIndex()), new
ArrayWritable(LongWritable.class));
assertFalse(result);
bspTask.sync();
// Superstep 1
-
+
checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
Text txtMessage = new Text("data");
@@ -559,8 +559,7 @@ public class TestCheckpoint extends Test
TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
TestMessageManager messenger = new TestMessageManager();
- PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
- .getPeerSyncClient(config);
+ PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config);
BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
(BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
@@ -578,7 +577,7 @@ public class TestCheckpoint extends Test
Text txtMessage = new Text("data");
messenger.addMessage(txtMessage);
-
+
bspTask.sync();
LOG.info("Completed second sync.");
@@ -620,8 +619,7 @@ public class TestCheckpoint extends Test
TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
TestMessageManager messenger = new TestMessageManager();
- PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
- .getPeerSyncClient(config);
+ PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config);
Text txtMessage = new Text("data");
String writeKey = "job_checkpttest_0001/checkpoint/1/";
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java?rev=1384022&r1=1384021&r2=1384022&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
Wed Sep 12 16:27:45 2012
@@ -122,7 +122,7 @@ public class TestSyncServiceFactory exte
Thread.sleep(1000);
- final PeerSyncClient syncClient = (PeerSyncClient) SyncServiceFactory
+ final PeerSyncClient syncClient = SyncServiceFactory
.getPeerSyncClient(conf);
assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
BSPJobID jobId = new BSPJobID("abc", 1);