Author: edwardyoon
Date: Wed Nov 21 06:04:26 2012
New Revision: 1411991
URL: http://svn.apache.org/viewvc?rev=1411991&view=rev
Log:
Monitoring of tasks is too sensitive.
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1411991&r1=1411990&r2=1411991&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Wed Nov
21 06:04:26 2012
@@ -207,10 +207,9 @@ public class GroomServer implements Runn
try {
startRecoveryTask(recoverAction);
} catch (IOException e) {
- throw new DirectiveException(
- new StringBuffer().append("Error starting the recovery task")
- .append(t.getTaskID()).toString(),
- e);
+ throw new DirectiveException(new StringBuffer()
+ .append("Error starting the recovery task")
+ .append(t.getTaskID()).toString(), e);
}
}
}
@@ -617,17 +616,17 @@ public class GroomServer implements Runn
}
Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
- while(taskIterator.hasNext()){
+ while (taskIterator.hasNext()) {
TaskAttemptID taskAttId = taskIterator.next();
- if(taskAttId.getTaskID().equals(t.getTaskID().getTaskID())){
- if(LOG.isDebugEnabled()){
+ if (taskAttId.getTaskID().equals(t.getTaskID().getTaskID())) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Removing tasks with id = " + t.getTaskID().getTaskID());
}
taskIterator.remove();
runningTasks.remove(taskAttId);
}
}
-
+
tasks.put(t.getTaskID(), tip);
runningTasks.put(t.getTaskID(), tip);
}
@@ -637,14 +636,14 @@ public class GroomServer implements Runn
String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n"
+ StringUtils
.stringifyException(e));
LOG.warn(msg);
-
+
try {
tip.killAndCleanup(true);
} catch (IOException ie2) {
LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n"
+ StringUtils.stringifyException(ie2));
}
- throw new IOException("Errro localizing the job.",e);
+ throw new IOException("Errro localizing the job.", e);
}
}
@@ -807,20 +806,17 @@ public class GroomServer implements Runn
+ " monitorPeriod = "
+ monitorPeriod
+ " check = "
- + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
&&
- (((tip.lastPingedTimestamp == 0 &&
- ((currentTime - tip.startTime) > 10 * monitorPeriod)) ||
- ((tip.lastPingedTimestamp > 0) &&
- (currentTime - tip.lastPingedTimestamp) >
monitorPeriod)))));
+ + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
&& (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 *
monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime -
tip.lastPingedTimestamp) > 6 * monitorPeriod)))));
// Task is out of contact if it has not pinged since more than
// monitorPeriod. A task is given a leeway of 10 times monitorPeriod
// to get started.
+
+ // TODO Please refactor this conditions
+ // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod
+
if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
- && (((tip.lastPingedTimestamp == 0
- && ((currentTime - tip.startTime) > 10 * monitorPeriod))
- || ((tip.lastPingedTimestamp > 0)
- && (currentTime - tip.lastPingedTimestamp) > monitorPeriod))))
{
+ && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime)
> 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime -
tip.lastPingedTimestamp) > 6 * monitorPeriod)))) {
LOG.info("adding purge task: " + tip.getTask().getTaskID());
@@ -1048,7 +1044,7 @@ public class GroomServer implements Runn
// runner could be null if task-cleanup attempt is not localized yet
if (runner != null) {
- if(LOG.isDebugEnabled()){
+ if (LOG.isDebugEnabled()) {
LOG.debug("Killing process for " + this.task.getTaskID());
}
runner.killBsp();
@@ -1058,7 +1054,7 @@ public class GroomServer implements Runn
public synchronized void killRunner() throws IOException {
if (runner != null) {
- if(LOG.isDebugEnabled()){
+ if (LOG.isDebugEnabled()) {
LOG.debug("Killing process for " + this.task.getTaskID());
}
runner.killBsp();
@@ -1251,12 +1247,11 @@ public class GroomServer implements Runn
defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
}
defaultConf.setInt(Constants.PEER_PORT, peerPort);
-
+
long superstep = Long.parseLong(args[4]);
TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
LOG.debug("Starting peer for sstep " + superstep + " state = " + state);
-
try {
// use job-specified working directory
FileSystem.get(job.getConfiguration()).setWorkingDirectory(