Resoleved merge conflicts
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/075744e5 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/075744e5 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/075744e5 Branch: refs/heads/master Commit: 075744e51b8433fc9ef6d7d3a338b2c1512226e7 Parents: 595be55 a256f35 Author: shamrath <[email protected]> Authored: Thu Oct 2 21:10:47 2014 -0400 Committer: shamrath <[email protected]> Committed: Thu Oct 2 21:10:47 2014 -0400 ---------------------------------------------------------------------- .../airavata/gfac/server/GfacServerHandler.java | 23 ++++++-- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 61 ++++++-------------- .../gsi/ssh/impl/GSISSHAbstractCluster.java | 13 ++--- 3 files changed, 39 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/075744e5/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --cc modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 2cadfc7,1acc9d6..11c33ec --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@@ -193,13 -194,15 +194,23 @@@ public class GfacServerHandler implemen * @param gatewayId */ public boolean submitJob(String experimentId, String taskId, String gatewayId) throws TException { - logger.infoId(experimentId, "GFac Received the Experiment: {} TaskId: {}", experimentId, taskId); - logger.info("GFac Recieved the Experiment: " + experimentId + " TaskId: " + taskId); ++ logger.infoId(experimentId, "GFac Received submit jog request for the Experiment: {} TaskId: {}", experimentId, taskId); GFac gfac = getGfac(); - InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId); - inHandlerFutures.add(GFacThreadPoolExecutor.getCachedThreadPool().submit(inputHandlerWorker)); - logger.infoId(experimentId, "Invoked in handle worker for the experiment {} , task {} and gateway {}", - experimentId, taskId, gatewayId); - return true; + // InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId); + try { - return gfac.submitJob(experimentId, taskId, gatewayId); ++ if( gfac.submitJob(experimentId, taskId, gatewayId)){ ++ logger.debugId(experimentId, "Submitted jog to the Gfac Implementation, experiment {}, task {}, gateway " + ++ "{}", experimentId, taskId, gatewayId); ++ return true; ++ }else{ ++ logger.error(experimentId, "Failed to submit job to the GFac implementation, experiment {}, task {}, " + ++ "gateway {}", experimentId, taskId, gatewayId); ++ return false; ++ } + } catch (GFacException e) { + throw new TException("Error launching the experiment : " + e.getMessage(), e); + } + // inHandlerFutures.add(GFacThreadPoolExecutor.getCachedThreadPool().submit(inputHandlerWorker)); } public boolean cancelJob(String experimentId, String taskId) throws TException { http://git-wip-us.apache.org/repos/asf/airavata/blob/075744e5/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 4cc36c5,e066669..96edf3c --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@@ -54,8 -61,8 +54,9 @@@ import java.util.concurrent.LinkedBlock * in grid resources and retrieve the job status. */ public class HPCPullMonitor extends PullMonitor { - private final static Logger logger = LoggerFactory.getLogger(HPCPullMonitor.class); ++ + private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(HPCPullMonitor.class); - public static final int FAILED_COUNT = 3; + public static final int FAILED_COUNT = 1; // I think this should use DelayedBlocking Queue to do the monitoring*/ private BlockingQueue<UserMonitorData> queue; @@@ -204,16 -210,9 +205,12 @@@ logger.info("This job is finished because push notification came with <username,jobName> " + completeId); completedJobs.put(iMonitorID.getJobName(), iMonitorID); iMonitorID.setStatus(JobState.COMPLETE); + iterator.remove();//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak + logger.debugId(completeId, "Push notification updated job {} status to {}. " + + "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(), + iMonitorID.getExperimentID(), iMonitorID.getTaskID()); break; } - //we have to make this empty everytime we iterate, otherwise this list will accumulate and will - // lead to a memory leak - } - if(completeId!=null) { - completedJobsFromPush.remove(completeId); } iterator = completedJobsFromPush.listIterator(); } @@@ -242,16 -236,17 +239,17 @@@ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); if (iMonitorID.getFailedCount() > FAILED_COUNT) { - logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" + iMonitorID.getFailedCount() + - " 3 times, so skip this Job from Monitor"); iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); JobDescriptor jobDescriptor = JobDescriptor.fromXML(iMonitorID.getJobExecutionContext().getJobDetails().getJobDescription()); - List<String> stdErr = connection.getCluster().listDirectory(jobDescriptor.getStandardErrorFile()); - List<String> stdOut = connection.getCluster().listDirectory(jobDescriptor.getStandardOutFile()); - if (stdErr.size() > 0 && stdOut.size() > 0) { + List<String> stdOut = connection.getCluster().listDirectory(jobDescriptor.getOutputDirectory()); // check the outputs directory + if (stdOut.size() > 0) { // have to be careful with this + completedJobs.put(iMonitorID.getJobName(), iMonitorID); + logger.errorId(iMonitorID.getJobID(), "Job monitoring failed {} times, removed job {} from " + + "monitor queue. Experiment {} , task {}", iMonitorID.getFailedCount(), + iMonitorID.getExperimentID(), iMonitorID.getTaskID()); - completedJobs.put(iMonitorID.getJobName(), iMonitorID); + } else { + iMonitorID.setFailedCount(0); } - } else { // Evey iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
