Repository: airavata
Updated Branches:
  refs/heads/master 5413e6718 -> f7de359dc


fixig concurrent modification exctpio in monitoring


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/af73c444
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/af73c444
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/af73c444

Branch: refs/heads/master
Commit: af73c44468149b23053da8dc3247e78894fd751d
Parents: 97b384b
Author: Ginnaliya Gamathige <[email protected]>
Authored: Tue Nov 11 10:50:02 2014 -0500
Committer: Ginnaliya Gamathige <[email protected]>
Committed: Tue Nov 11 10:50:02 2014 -0500

----------------------------------------------------------------------
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 66 +++++++++++---------
 .../airavata/gfac/monitor/util/CommonUtils.java |  6 +-
 2 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/af73c444/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 8e5f758..66cc5f7 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -158,21 +158,22 @@ public class HPCPullMonitor extends PullMonitor {
         try {
             take = this.queue.take();
             List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
-            for (HostMonitorData iHostMonitorData : hostMonitorData) {
+            for (ListIterator<HostMonitorData> hostIterator = 
hostMonitorData.listIterator(); hostIterator.hasNext();) {
+                HostMonitorData iHostMonitorData = hostIterator.next();
                 if (iHostMonitorData.getHost().getType() instanceof 
GsisshHostType
                         || iHostMonitorData.getHost().getType() instanceof 
SSHHostType) {
-                    String hostName =  
iHostMonitorData.getHost().getType().getHostAddress();
+                    String hostName = 
iHostMonitorData.getHost().getType().getHostAddress();
                     ResourceConnection connection = null;
                     if (connections.containsKey(hostName)) {
-                        if(!connections.get(hostName).isConnected()){
-                            connection = new 
ResourceConnection(iHostMonitorData,getAuthenticationInfo());
+                        if (!connections.get(hostName).isConnected()) {
+                            connection = new 
ResourceConnection(iHostMonitorData, getAuthenticationInfo());
                             connections.put(hostName, connection);
-                        }else{
+                        } else {
                             logger.debug("We already have this connection so 
not going to create one");
                             connection = connections.get(hostName);
                         }
                     } else {
-                        connection = new 
ResourceConnection(iHostMonitorData,getAuthenticationInfo());
+                        connection = new ResourceConnection(iHostMonitorData, 
getAuthenticationInfo());
                         connections.put(hostName, connection);
                     }
 
@@ -180,18 +181,18 @@ public class HPCPullMonitor extends PullMonitor {
                     List<MonitorID> monitorID = 
iHostMonitorData.getMonitorIDs();
                     Iterator<String> iterator1 = cancelJobList.iterator();
                     ListIterator<MonitorID> monitorIDListIterator = 
monitorID.listIterator();
-                    while (monitorIDListIterator.hasNext()){
+                    while (monitorIDListIterator.hasNext()) {
                         MonitorID iMonitorID = monitorIDListIterator.next();
-                        while(iterator1.hasNext()) {
+                        while (iterator1.hasNext()) {
                             String cancelMId = iterator1.next();
                             if (cancelMId.equals(iMonitorID.getExperimentID() 
+ "+" + iMonitorID.getTaskID())) {
                                 iMonitorID.setStatus(JobState.CANCELED);
-                                
CommonUtils.removeMonitorFromQueue(take,iMonitorID);
+                                CommonUtils.removeMonitorFromQueue(take, 
iMonitorID);
                                 logger.debugId(cancelMId, "Found a match in 
cancel monitor queue, hence moved to the " +
                                                 "completed job queue, 
experiment {}, task {} , job {}",
                                         iMonitorID.getExperimentID(), 
iMonitorID.getTaskID(), iMonitorID.getJobID());
                                 logger.info("Job cancelled: marking the Job as 
************CANCELLED************ experiment {}, task {}, job name {} .",
-                                        
iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+                                        iMonitorID.getExperimentID(), 
iMonitorID.getTaskID(), iMonitorID.getJobName());
                                 sendNotification(iMonitorID);
                                 monitorIDListIterator.remove();
                                 
GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, 
iMonitorID, publisher));
@@ -201,22 +202,19 @@ public class HPCPullMonitor extends PullMonitor {
                         iterator1 = cancelJobList.iterator();
                     }
                     synchronized (completedJobsFromPush) {
-                        ListIterator<String> iterator = 
completedJobsFromPush.listIterator();
-                        monitorIDListIterator = monitorID.listIterator();
-                        while (monitorIDListIterator.hasNext()) {
-                            MonitorID iMonitorID = 
monitorIDListIterator.next();
-                            String completeId = null;
-                            while (iterator.hasNext()) {
-                                 completeId = iterator.next();
+                        for (ListIterator<String> iterator = 
completedJobsFromPush.listIterator(); iterator.hasNext(); ) {
+                            String completeId = iterator.next();
+                            for (monitorIDListIterator = 
monitorID.listIterator(); monitorIDListIterator.hasNext(); ) {
+                                MonitorID iMonitorID = 
monitorIDListIterator.next();
                                 if (completeId.equals(iMonitorID.getUserName() 
+ "," + iMonitorID.getJobName())) {
                                     logger.info("This job is finished because 
push notification came with <username,jobName> " + completeId);
                                     iMonitorID.setStatus(JobState.COMPLETE);
-                                    
CommonUtils.removeMonitorFromQueue(take,iMonitorID);//we have to make this 
empty everytime we iterate, otherwise this list will accumulate and will lead 
to a memory leak
+                                    CommonUtils.removeMonitorFromQueue(take, 
iMonitorID);//we have to make this empty everytime we iterate, otherwise this 
list will accumulate and will lead to a memory leak
                                     logger.debugId(completeId, "Push 
notification updated job {} status to {}. " +
                                                     "experiment {} , task 
{}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(),
                                             iMonitorID.getExperimentID(), 
iMonitorID.getTaskID());
                                     logger.info("AMQP message recieved: 
marking the Job as ************COMPLETE************ experiment {}, task {}, job 
name {} .",
-                                            
iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+                                            iMonitorID.getExperimentID(), 
iMonitorID.getTaskID(), iMonitorID.getJobName());
 
                                     iterator.remove();
                                     sendNotification(iMonitorID);
@@ -224,36 +222,34 @@ public class HPCPullMonitor extends PullMonitor {
                                     break;
                                 }
                             }
-                            iterator = completedJobsFromPush.listIterator();
                         }
                     }
 
                     // we have to get this again because we removed the 
already completed jobs with amqp messages
                     monitorID = iHostMonitorData.getMonitorIDs();
                     Map<String, JobState> jobStatuses = 
connection.getJobStatuses(monitorID);
-                    Iterator<MonitorID> iterator = monitorID.listIterator();
-                    while (iterator.hasNext()) {
+                    for (Iterator<MonitorID> iterator = 
monitorID.listIterator(); iterator.hasNext(); ) {
                         MonitorID iMonitorID = iterator.next();
                         currentMonitorID = iMonitorID;
-                        if (!JobState.CANCELED.equals(iMonitorID.getStatus())&&
+                        if (!JobState.CANCELED.equals(iMonitorID.getStatus()) 
&&
                                 
!JobState.COMPLETE.equals(iMonitorID.getStatus())) {
                             
iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + 
iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we have a 
logic
-                        }else 
if(JobState.COMPLETE.equals(iMonitorID.getStatus())){
+                        } else if 
(JobState.COMPLETE.equals(iMonitorID.getStatus())) {
                             logger.debugId(iMonitorID.getJobID(), "Moved job 
{} to completed jobs map, experiment {}, " +
                                     "task {}", iMonitorID.getJobID(), 
iMonitorID.getExperimentID(), iMonitorID.getTaskID());
-                            
CommonUtils.removeMonitorFromQueue(take,iMonitorID);
+                            CommonUtils.removeMonitorFromQueue(take, 
iMonitorID);
                             logger.info("PULL Notification is complete: 
marking the Job as ************COMPLETE************ experiment {}, task {}, job 
name {} .",
-                                    
iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+                                    iMonitorID.getExperimentID(), 
iMonitorID.getTaskID(), iMonitorID.getJobName());
                             
GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, 
iMonitorID, publisher));
                         }
-                        
iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName()));
    //IMPORTANT this is not a simple setter we have a logic
+                        
iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + 
iMonitorID.getJobName()));    //IMPORTANT this is not a simple setter we have a 
logic
                         iMonitorID.setLastMonitored(new Timestamp((new 
Date()).getTime()));
                         sendNotification(iMonitorID);
                         // if the job is completed we do not have to put the 
job to the queue again
                         iMonitorID.setLastMonitored(new Timestamp((new 
Date()).getTime()));
                     }
-                    iterator = monitorID.listIterator();
-                    while(iterator.hasNext()){
+
+                    for (Iterator<MonitorID> iterator = 
monitorID.listIterator(); iterator.hasNext(); ) {
                         MonitorID iMonitorID = iterator.next();
                         if (iMonitorID.getFailedCount() > FAILED_COUNT) {
                             iMonitorID.setLastMonitored(new Timestamp((new 
Date()).getTime()));
@@ -276,9 +272,9 @@ public class HPCPullMonitor extends PullMonitor {
                                                 " Experiment {} , task {}", 
iMonitorID.getFailedCount(),
                                         iMonitorID.getExperimentID(), 
iMonitorID.getTaskID());
                                 logger.info("Listing directory came as 
complete: marking the Job as ************COMPLETE************ experiment {}, 
task {}, job name {} .",
-                                        
iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+                                        iMonitorID.getExperimentID(), 
iMonitorID.getTaskID(), iMonitorID.getJobName());
                                 sendNotification(iMonitorID);
-                                
CommonUtils.removeMonitorFromQueue(take,iMonitorID);
+                                CommonUtils.removeMonitorFromQueue(take, 
iMonitorID);
                                 
GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, 
iMonitorID, publisher));
                             } else {
                                 iMonitorID.setFailedCount(0);
@@ -302,6 +298,14 @@ public class HPCPullMonitor extends PullMonitor {
             // during individual monitorID removal we remove the 
HostMonitorData object if it become empty
             // so if all the jobs are finished for all the hostMOnitorId 
objects in userMonitorData object
             // we should remove it from the queue so here we do not put it 
back.
+            for (ListIterator<HostMonitorData> iterator1 = 
take.getHostMonitorData().listIterator(); iterator1.hasNext(); ) {
+                HostMonitorData iHostMonitorID = iterator1.next();
+                if (iHostMonitorID.getMonitorIDs().size() == 0) {
+                    iterator1.remove();
+                    logger.debug("Removed host {} from monitoring queue", 
iHostMonitorID.getHost()
+                            .getType().getHostAddress());
+                }
+            }
             if(take.getHostMonitorData().size()!=0) {
                 queue.put(take);
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/af73c444/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index 6152505..531b8ff 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -170,11 +170,7 @@ public class CommonUtils {
                                     iterator2.remove();
                                     logger.infoId(monitorID.getJobID(), 
"Removed the jobId: {} JobName: {} from monitoring last " +
                                             "status:{}", 
monitorID.getJobID(),monitorID.getJobName(), monitorID.getStatus().toString());
-                                    if (iHostMonitorID.getMonitorIDs().size() 
== 0) {
-                                        iterator1.remove();
-                                        logger.debug("Removed host {} from 
monitoring queue", iHostMonitorID.getHost()
-                                                .getType().getHostAddress());
-                                    }
+
                                     return;
                                 }
                             }

Reply via email to