Hi,
I saw most failures are caused by too-sensitive monitoring. Please
check whether any problem can be occurred with this change.
+ && (((tip.lastPingedTimestamp == 0 && ((currentTime -
tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp >
0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod))))
{
On Wed, Nov 21, 2012 at 3:04 PM, <[email protected]> wrote:
> Author: edwardyoon
> Date: Wed Nov 21 06:04:26 2012
> New Revision: 1411991
>
> URL: http://svn.apache.org/viewvc?rev=1411991&view=rev
> Log:
> Monitoring of tasks is too sensitive.
>
> Modified:
> hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
>
> Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> URL:
> http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1411991&r1=1411990&r2=1411991&view=diff
> ==============================================================================
> --- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> (original)
> +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Wed
> Nov 21 06:04:26 2012
> @@ -207,10 +207,9 @@ public class GroomServer implements Runn
> try {
> startRecoveryTask(recoverAction);
> } catch (IOException e) {
> - throw new DirectiveException(
> - new StringBuffer().append("Error starting the recovery
> task")
> - .append(t.getTaskID()).toString(),
> - e);
> + throw new DirectiveException(new StringBuffer()
> + .append("Error starting the recovery task")
> + .append(t.getTaskID()).toString(), e);
> }
> }
> }
> @@ -617,17 +616,17 @@ public class GroomServer implements Runn
> }
>
> Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
> - while(taskIterator.hasNext()){
> + while (taskIterator.hasNext()) {
> TaskAttemptID taskAttId = taskIterator.next();
> - if(taskAttId.getTaskID().equals(t.getTaskID().getTaskID())){
> - if(LOG.isDebugEnabled()){
> + if (taskAttId.getTaskID().equals(t.getTaskID().getTaskID())) {
> + if (LOG.isDebugEnabled()) {
> LOG.debug("Removing tasks with id = " +
> t.getTaskID().getTaskID());
> }
> taskIterator.remove();
> runningTasks.remove(taskAttId);
> }
> }
> -
> +
> tasks.put(t.getTaskID(), tip);
> runningTasks.put(t.getTaskID(), tip);
> }
> @@ -637,14 +636,14 @@ public class GroomServer implements Runn
> String msg = ("Error initializing " + tip.getTask().getTaskID() +
> ":\n" + StringUtils
> .stringifyException(e));
> LOG.warn(msg);
> -
> +
> try {
> tip.killAndCleanup(true);
> } catch (IOException ie2) {
> LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n"
> + StringUtils.stringifyException(ie2));
> }
> - throw new IOException("Errro localizing the job.",e);
> + throw new IOException("Errro localizing the job.", e);
> }
> }
>
> @@ -807,20 +806,17 @@ public class GroomServer implements Runn
> + " monitorPeriod = "
> + monitorPeriod
> + " check = "
> - + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
> &&
> - (((tip.lastPingedTimestamp == 0 &&
> - ((currentTime - tip.startTime) > 10 * monitorPeriod)) ||
> - ((tip.lastPingedTimestamp > 0) &&
> - (currentTime - tip.lastPingedTimestamp) >
> monitorPeriod)))));
> + + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
> && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 *
> monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime -
> tip.lastPingedTimestamp) > 6 * monitorPeriod)))));
>
> // Task is out of contact if it has not pinged since more than
> // monitorPeriod. A task is given a leeway of 10 times monitorPeriod
> // to get started.
> +
> + // TODO Please refactor this conditions
> + // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod
> +
> if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
> - && (((tip.lastPingedTimestamp == 0
> - && ((currentTime - tip.startTime) > 10 * monitorPeriod))
> - || ((tip.lastPingedTimestamp > 0)
> - && (currentTime - tip.lastPingedTimestamp) >
> monitorPeriod)))) {
> + && (((tip.lastPingedTimestamp == 0 && ((currentTime -
> tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) &&
> (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))) {
>
> LOG.info("adding purge task: " + tip.getTask().getTaskID());
>
> @@ -1048,7 +1044,7 @@ public class GroomServer implements Runn
>
> // runner could be null if task-cleanup attempt is not localized yet
> if (runner != null) {
> - if(LOG.isDebugEnabled()){
> + if (LOG.isDebugEnabled()) {
> LOG.debug("Killing process for " + this.task.getTaskID());
> }
> runner.killBsp();
> @@ -1058,7 +1054,7 @@ public class GroomServer implements Runn
>
> public synchronized void killRunner() throws IOException {
> if (runner != null) {
> - if(LOG.isDebugEnabled()){
> + if (LOG.isDebugEnabled()) {
> LOG.debug("Killing process for " + this.task.getTaskID());
> }
> runner.killBsp();
> @@ -1251,12 +1247,11 @@ public class GroomServer implements Runn
> defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
> }
> defaultConf.setInt(Constants.PEER_PORT, peerPort);
> -
> +
> long superstep = Long.parseLong(args[4]);
> TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
> LOG.debug("Starting peer for sstep " + superstep + " state = " +
> state);
>
> -
> try {
> // use job-specified working directory
> FileSystem.get(job.getConfiguration()).setWorkingDirectory(
>
>
--
Best Regards, Edward J. Yoon
@eddieyoon