Repository: airavata Updated Branches: refs/heads/master c366adeee -> 3f8986881
fixing zk error due to complete jobs is not properly handled Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3f898688 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3f898688 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3f898688 Branch: refs/heads/master Commit: 3f89868814759f62a743a118cec6f4a299954b8e Parents: c366ade Author: lahiru <[email protected]> Authored: Wed Sep 24 14:51:25 2014 -0400 Committer: lahiru <[email protected]> Committed: Wed Sep 24 14:51:25 2014 -0400 ---------------------------------------------------------------------- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 79 ++++++++++---------- 1 file changed, 38 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/3f898688/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 9fbbb85..34a6065 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -194,6 +194,7 @@ public class HPCPullMonitor extends PullMonitor { completedJobs.add(iMonitorID); iMonitorID.setStatus(JobState.CANCELED); iterator1.remove(); + break; } } iterator1 = cancelJobList.iterator(); @@ -201,67 +202,62 @@ public class HPCPullMonitor extends PullMonitor { synchronized (completedJobsFromPush) { Iterator<String> iterator = completedJobsFromPush.iterator(); for (MonitorID iMonitorID : monitorID) { + String completeId = null; while (iterator.hasNext()) { - String cancelMId = iterator.next(); - if (cancelMId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { - logger.info("This job is finished because push notification came with <username,jobName> " + cancelMId); + completeId = iterator.next(); + if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { + logger.info("This job is finished because push notification came with <username,jobName> " + completeId); completedJobs.add(iMonitorID); iMonitorID.setStatus(JobState.COMPLETE); + break; } - //we have to make this empty everytime we iterate, otherwise this list will accumilate and will + //we have to make this empty everytime we iterate, otherwise this list will accumulate and will // lead to a memory leak - iterator.remove(); + } + if(completeId!=null) { + completedJobsFromPush.remove(completeId); } iterator = completedJobsFromPush.listIterator(); } } Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID); - for (MonitorID iMonitorID : monitorID) { + Iterator<MonitorID> iterator = monitorID.iterator(); + while (iterator.hasNext()) { + MonitorID iMonitorID = iterator.next(); currentMonitorID = iMonitorID; if (!JobState.CANCELED.equals(iMonitorID.getStatus())&& !JobState.COMPLETE.equals(iMonitorID.getStatus())) { iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic - } - - String id = iMonitorID.getUserName() + "," + iMonitorID.getJobName(); - if(completedJobsFromPush.contains(id)){ - iMonitorID.setStatus(JobState.COMPLETE); + }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){ + completedJobs.add(iMonitorID); } jobStatus = new JobStatusChangeRequest(iMonitorID); - // we have this JobStatus class to handle amqp monitoring + // we have this JobStatus class to handle amqp monitoring - publisher.publish(jobStatus); - // if the job is completed we do not have to put the job to the queue again - iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + publisher.publish(jobStatus); + // if the job is completed we do not have to put the job to the queue again + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); - // After successful monitoring perform follow ing actions to cleanup the queue, if necessary - if (jobStatus.getState().equals(JobState.COMPLETE)) { - if(completedJobs.contains(iMonitorID)) { - completedJobs.add(iMonitorID); - } - // we run all the finished jobs in separate threads, because each job doesn't have to wait until - // each one finish transfering files - GFacThreadPoolExecutor.getCachedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); - } else if (iMonitorID.getFailedCount() > FAILED_COUNT) { - logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" +iMonitorID.getFailedCount()+ - " 3 times, so skip this Job from Monitor"); - iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); - completedJobs.add(iMonitorID); - try { - logger.error("Launching outflow handlers to check output are genereated or not"); - gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext()); - } catch (GFacException e) { - publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(), - iMonitorID.getTaskID()), TaskState.FAILED)); - logger.info(e.getLocalizedMessage(), e); - } - } else { - // Evey - iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); - // if the job is complete we remove it from the Map, if any of these maps - // get empty this userMonitorData will get delete from the queue + if (iMonitorID.getFailedCount() > FAILED_COUNT) { + logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" + iMonitorID.getFailedCount() + + " 3 times, so skip this Job from Monitor"); + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + completedJobs.add(iMonitorID); + try { + logger.error("Launching outflow handlers to check output are genereated or not"); + gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext()); + } catch (GFacException e) { + publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(), + iMonitorID.getTaskID()), TaskState.FAILED)); + logger.info(e.getLocalizedMessage(), e); } + } else { + // Evey + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + // if the job is complete we remove it from the Map, if any of these maps + // get empty this userMonitorData will get delete from the queue } + } } else { logger.debug("Qstat Monitor doesn't handle non-gsissh hosts"); } @@ -274,6 +270,7 @@ public class HPCPullMonitor extends PullMonitor { Map<String, Integer> jobRemoveCountMap = new HashMap<String, Integer>(); ZooKeeper zk = null; for (MonitorID completedJob : completedJobs) { + GFacThreadPoolExecutor.getCachedThreadPool().submit(new OutHandlerWorker(gfac, completedJob, publisher)); CommonUtils.removeMonitorFromQueue(queue, completedJob); if (zk == null) { zk = completedJob.getJobExecutionContext().getZk();
