http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java new file mode 100644 index 0000000..0710d9e --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.email.parser; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.gfac.core.monitor.EmailParser; +import org.apache.airavata.gfac.core.monitor.JobStatusResult; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.mail.Message; +import javax.mail.MessagingException; +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class UGEEmailParser implements EmailParser { + + private static final Logger log = LoggerFactory.getLogger(UGEEmailParser.class); + private static final String REGEX = "[\\w]*[ ]*(?<"+ JOBID + ">[\\d]*)[ ]*\\((?<" + JOBNAME + + ">[a-zA-Z0-9]*)\\)[ ]*(?<" + STATUS + ">[a-zA-Z]*)"; + public static final String STARTED = "Started"; + public static final String COMPLETE = "Complete"; + public static final String FAILED = "Failed"; + private static final String REGEX_EXIT_STATUS = "Exit Status[ ]*=[ ]*(?<" + EXIT_STATUS + ">[\\d]+)"; + public static final String ABORTED = "Aborted"; + + + @Override + public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException { + JobStatusResult jobStatusResult = new JobStatusResult(); + + String subject = message.getSubject(); + Pattern pattern = Pattern.compile(REGEX); + Matcher matcher = pattern.matcher(subject); + try { + if (matcher.find()) { + jobStatusResult.setJobId(matcher.group(JOBID)); + jobStatusResult.setJobName(matcher.group(JOBNAME)); + String content = (String) message.getContent(); + jobStatusResult.setState(getJobState(matcher.group(STATUS), content)); + } else { + log.error("[EJM]: No matched found for subject => \n" + subject); + } + } catch (IOException e) { + throw new AiravataException("[EJM]: Error while reading content of the email message"); + } + return jobStatusResult; + } + + private JobState getJobState(String status, String content) { + switch (status) { + case STARTED: + return JobState.ACTIVE; + case COMPLETE: + int exitStatus = getExitStatus(content); + if (exitStatus == 0) { + return JobState.COMPLETE; + } else { + log.info("[EJM]: Job returns with Exit Status = " + exitStatus + " , Marked as Failed"); + return JobState.FAILED; + } + case FAILED: + return JobState.FAILED; + case ABORTED: + return JobState.FAILED; + default: + return JobState.UNKNOWN; + + } + } + + private int getExitStatus(String content) { + Pattern statusPattern = Pattern.compile(REGEX_EXIT_STATUS); + Matcher statusMatcher = statusPattern.matcher(content); + if (statusMatcher.find()) { + String group = statusMatcher.group(EXIT_STATUS); + if (group != null && !group.trim().isEmpty()) { + return Integer.valueOf(group.trim()); + } + } + return -1; + } +}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java new file mode 100644 index 0000000..3acef66 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.exception; + +public class AiravataMonitorException extends Exception { + private static final long serialVersionUID = -2849422320139467602L; + + public AiravataMonitorException(Throwable e) { + super(e); + } + + public AiravataMonitorException(String message) { + super(message, null); + } + + public AiravataMonitorException(String message, Throwable e) { + super(message, e); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java new file mode 100644 index 0000000..e31458d --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -0,0 +1,139 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.handlers; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.logger.AiravataLogger; +import org.apache.airavata.common.logger.AiravataLoggerFactory; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.handler.ThreadedHandler; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.HPCMonitorID; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor; +import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.gfac.ssh.api.authentication.AuthenticationInfo; +import org.apache.airavata.gfac.ssh.impl.authentication.MyProxyAuthenticationInfo; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +import java.util.Properties; + +/** + * this handler is responsible for monitoring jobs in a pull mode + * and currently this support multiple pull monitoring in grid resource and uses + * commands like qstat,squeue and this supports sun grid enging monitoring too + * which is a slight variation of qstat monitoring. + */ +public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{ + private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GridPullMonitorHandler.class); + + private HPCPullMonitor hpcPullMonitor; + + private AuthenticationInfo authenticationInfo; + + public void initProperties(Properties properties) throws GFacHandlerException { + String myProxyUser = null; + try { + myProxyUser = ServerSettings.getSetting("myproxy.username"); + String myProxyPass = ServerSettings.getSetting("myproxy.password"); + String certPath = ServerSettings.getSetting("trusted.cert.location"); + String myProxyServer = ServerSettings.getSetting("myproxy.server"); + setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer, + 7512, 17280000, certPath)); + hpcPullMonitor = new HPCPullMonitor(null,getAuthenticationInfo()); // we use our own credentials for monitoring, not from the store + } catch (ApplicationSettingsException e) { + logger.error("Error while reading server properties", e); + throw new GFacHandlerException("Error while reading server properties", e); + } + } + + public void run() { + hpcPullMonitor.run(); + } + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + super.invoke(jobExecutionContext); + hpcPullMonitor.setGfac(jobExecutionContext.getGfac()); + hpcPullMonitor.setPublisher(jobExecutionContext.getMonitorPublisher()); + MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext); + try { + /* ZooKeeper zk = jobExecutionContext.getZk(); + try { + String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk); + String path = experimentEntry + File.separator + "operation"; + Stat exists = zk.exists(path, this); + if (exists != null) { + zk.getData(path, this, exists); // watching the operations node + } + } catch (KeeperException e) { + logger.error(e.getMessage(), e); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + }*/ + CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID, jobExecutionContext); + CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper + } catch (AiravataMonitorException e) { + logger.errorId(monitorID.getJobID(), "Error adding job {} monitorID object to the queue with experiment {}", + monitorID.getJobID(), monitorID.getExperimentID()); + } + } + + @Override + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + // TODO: Auto generated method body. + } + + public AuthenticationInfo getAuthenticationInfo() { + return authenticationInfo; + } + + public HPCPullMonitor getHpcPullMonitor() { + return hpcPullMonitor; + } + + public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) { + this.authenticationInfo = authenticationInfo; + } + + public void setHpcPullMonitor(HPCPullMonitor hpcPullMonitor) { + this.hpcPullMonitor = hpcPullMonitor; + } + + + public void process(WatchedEvent watchedEvent) { + logger.info(watchedEvent.getPath()); + if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){ + // node data is changed, this means node is cancelled. + logger.info("Experiment is cancelled with this path:"+watchedEvent.getPath()); + + String[] split = watchedEvent.getPath().split("/"); + for(String element:split) { + if (element.contains("+")) { + logger.info("Adding experimentID+TaskID to be removed from monitoring:"+element); + hpcPullMonitor.getCancelJobList().add(element); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java new file mode 100644 index 0000000..6db7da5 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.handlers; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.handler.ThreadedHandler; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.HPCMonitorID; +import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor; +import org.apache.airavata.gfac.ssh.api.authentication.AuthenticationInfo; +import org.apache.airavata.gfac.ssh.impl.authentication.MyProxyAuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * this handler is responsible monitoring jobs in push mode + * and currently this support multiple push monitoring in grid resource + */ +public class GridPushMonitorHandler extends ThreadedHandler { + private final static Logger logger= LoggerFactory.getLogger(GridPushMonitorHandler.class); + + private AMQPMonitor amqpMonitor; + + private AuthenticationInfo authenticationInfo; + + @Override + public void initProperties(Properties properties) throws GFacHandlerException { + String myProxyUser=null; + try{ + myProxyUser = ServerSettings.getSetting("myproxy.username"); + String myProxyPass = ServerSettings.getSetting("myproxy.password"); + String certPath = ServerSettings.getSetting("trusted.cert.location"); + String myProxyServer = ServerSettings.getSetting("myproxy.server"); + setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer, + 7512, 17280000, certPath)); + + String hostList=(String)properties.get("hosts"); + String proxyFilePath = ServerSettings.getSetting("proxy.file.path"); + String connectionName=ServerSettings.getSetting("connection.name"); + LinkedBlockingQueue<MonitorID> pushQueue = new LinkedBlockingQueue<MonitorID>(); + LinkedBlockingQueue<MonitorID> finishQueue = new LinkedBlockingQueue<MonitorID>(); + List<String> hosts= Arrays.asList(hostList.split(",")); + amqpMonitor=new AMQPMonitor(null,pushQueue,finishQueue,proxyFilePath,connectionName,hosts); + }catch (ApplicationSettingsException e){ + logger.error(e.getMessage(), e); + throw new GFacHandlerException(e.getMessage(), e); + } + } + + @Override + public void run() { + amqpMonitor.run(); + } + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException{ + super.invoke(jobExecutionContext); + MonitorID monitorID=new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext); + amqpMonitor.getRunningQueue().add(monitorID); + } + + @Override + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + // TODO: Auto generated method body. + } + + public AMQPMonitor getAmqpMonitor() { + return amqpMonitor; + } + + public void setAmqpMonitor(AMQPMonitor amqpMonitor) { + this.amqpMonitor = amqpMonitor; + } + + public AuthenticationInfo getAuthenticationInfo() { + return authenticationInfo; + } + + public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) { + this.authenticationInfo = authenticationInfo; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java new file mode 100644 index 0000000..553ded9 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -0,0 +1,471 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.pull.qstat; + +import com.google.common.eventbus.EventBus; +import org.apache.airavata.common.logger.AiravataLogger; +import org.apache.airavata.common.logger.AiravataLoggerFactory; +import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.gfac.core.GFac; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.core.GFacThreadPoolExecutor; +import org.apache.airavata.gfac.core.utils.OutHandlerWorker; +import org.apache.airavata.gfac.monitor.HostMonitorData; +import org.apache.airavata.gfac.monitor.UserMonitorData; +import org.apache.airavata.gfac.monitor.core.PullMonitor; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer; +import org.apache.airavata.gfac.ssh.api.SSHApiException; +import org.apache.airavata.gfac.ssh.api.authentication.AuthenticationInfo; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.messaging.event.JobIdentifier; +import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; +import org.apache.airavata.model.workspace.experiment.JobState; + +import java.sql.Timestamp; +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * This monitor is based on qstat command which can be run + * in grid resources and retrieve the job status. + */ +public class HPCPullMonitor extends PullMonitor { + + private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(HPCPullMonitor.class); + public static final int FAILED_COUNT = 5; + + // I think this should use DelayedBlocking Queue to do the monitoring*/ + private BlockingQueue<UserMonitorData> queue; + + private boolean startPulling = false; + + private Map<String, ResourceConnection> connections; + + private MonitorPublisher publisher; + + private LinkedBlockingQueue<String> cancelJobList; + + private List<String> completedJobsFromPush; + + private GFac gfac; + + private AuthenticationInfo authenticationInfo; + + private ArrayList<MonitorID> removeList; + + public HPCPullMonitor() { + connections = new HashMap<String, ResourceConnection>(); + queue = new LinkedBlockingDeque<UserMonitorData>(); + publisher = new MonitorPublisher(new EventBus()); + cancelJobList = new LinkedBlockingQueue<String>(); + completedJobsFromPush = new ArrayList<String>(); + (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen(); + removeList = new ArrayList<MonitorID>(); + } + + public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) { + connections = new HashMap<String, ResourceConnection>(); + queue = new LinkedBlockingDeque<UserMonitorData>(); + publisher = monitorPublisher; + authenticationInfo = authInfo; + cancelJobList = new LinkedBlockingQueue<String>(); + this.completedJobsFromPush = new ArrayList<String>(); + (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen(); + removeList = new ArrayList<MonitorID>(); + } + + public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) { + this.queue = queue; + this.publisher = publisher; + connections = new HashMap<String, ResourceConnection>(); + cancelJobList = new LinkedBlockingQueue<String>(); + this.completedJobsFromPush = new ArrayList<String>(); + (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen(); + removeList = new ArrayList<MonitorID>(); + } + + + public void run() { + /* implement a logic to pick each monitorID object from the queue and do the + monitoring + */ + this.startPulling = true; + while (this.startPulling && !ServerSettings.isStopAllThreads()) { + try { + // After finishing one iteration of the full queue this thread sleeps 1 second + synchronized (this.queue) { + if (this.queue.size() > 0) { + startPulling(); + } + } + Thread.sleep(10000); + } catch (Exception e) { + // we catch all the exceptions here because no matter what happens we do not stop running this + // thread, but ideally we should report proper error messages, but this is handled in startPulling + // method, incase something happen in Thread.sleep we handle it with this catch block. + logger.error(e.getMessage(),e); + } + } + // thread is going to return so we close all the connections + Iterator<String> iterator = connections.keySet().iterator(); + while (iterator.hasNext()) { + String next = iterator.next(); + ResourceConnection resourceConnection = connections.get(next); + try { + resourceConnection.getCluster().disconnect(); + } catch (SSHApiException e) { + logger.error("Erro while connecting to the cluster", e); + } + } + } + + /** + * This method will can invoke when PullMonitor needs to start + * and it has to invoke in the frequency specified below, + * + * @return if the start process is successful return true else false + */ + public boolean startPulling() throws AiravataMonitorException { + // take the top element in the queue and pull the data and put that element + // at the tail of the queue + //todo this polling will not work with multiple usernames but with single user + // and multiple hosts, currently monitoring will work + UserMonitorData take = null; + JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent(); + MonitorID currentMonitorID = null; + try { + take = this.queue.take(); + List<HostMonitorData> hostMonitorData = take.getHostMonitorData(); + for (ListIterator<HostMonitorData> hostIterator = hostMonitorData.listIterator(); hostIterator.hasNext();) { + HostMonitorData iHostMonitorData = hostIterator.next(); + if (iHostMonitorData.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { + String hostName = iHostMonitorData.getComputeResourceDescription().getHostName(); + ResourceConnection connection = null; + if (connections.containsKey(hostName)) { + if (!connections.get(hostName).isConnected()) { + connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo()); + connections.put(hostName, connection); + } else { + logger.debug("We already have this connection so not going to create one"); + connection = connections.get(hostName); + } + } else { + connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo()); + connections.put(hostName, connection); + } + + // before we get the statuses, we check the cancel job list and remove them permanently + List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs(); + Iterator<String> iterator1 = cancelJobList.iterator(); + ListIterator<MonitorID> monitorIDListIterator = monitorID.listIterator(); + while (monitorIDListIterator.hasNext()) { + MonitorID iMonitorID = monitorIDListIterator.next(); + while (iterator1.hasNext()) { + String cancelMId = iterator1.next(); + if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) { + iMonitorID.setStatus(JobState.CANCELED); +// CommonUtils.removeMonitorFromQueue(take, iMonitorID); + removeList.add(iMonitorID); + logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " + + "completed job queue, experiment {}, task {} , job {}", + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID()); + logger.info("Job cancelled: marking the Job as ************CANCELLED************ experiment {}, task {}, job name {} .", + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); + sendNotification(iMonitorID); + logger.info("To avoid timing issues we sleep sometime and try to retrieve output files"); + Thread.sleep(10000); + GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); + break; + } + } + iterator1 = cancelJobList.iterator(); + } + + cleanup(take); + + synchronized (completedJobsFromPush) { + for (ListIterator<String> iterator = completedJobsFromPush.listIterator(); iterator.hasNext(); ) { + String completeId = iterator.next(); + for (monitorIDListIterator = monitorID.listIterator(); monitorIDListIterator.hasNext(); ) { + MonitorID iMonitorID = monitorIDListIterator.next(); + if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { + logger.info("This job is finished because push notification came with <username,jobName> " + completeId); + iMonitorID.setStatus(JobState.COMPLETE); +// CommonUtils.removeMonitorFromQueue(take, iMonitorID);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak + removeList.add(iMonitorID); + logger.debugId(completeId, "Push notification updated job {} status to {}. " + + "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(), + iMonitorID.getExperimentID(), iMonitorID.getTaskID()); + logger.info("AMQP message recieved: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); + + sendNotification(iMonitorID); + logger.info("To avoid timing issues we sleep sometime and try to retrieve output files"); + Thread.sleep(10000); + GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); + break; + } + } + } + } + + cleanup(take); + + // we have to get this again because we removed the already completed jobs with amqp messages + monitorID = iHostMonitorData.getMonitorIDs(); + Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID); + for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) { + MonitorID iMonitorID = iterator.next(); + currentMonitorID = iMonitorID; + if (!JobState.CANCELED.equals(iMonitorID.getStatus()) && + !JobState.COMPLETE.equals(iMonitorID.getStatus())) { + iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic + } else if (JobState.COMPLETE.equals(iMonitorID.getStatus())) { + logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " + + "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); +// CommonUtils.removeMonitorFromQueue(take, iMonitorID); + removeList.add(iMonitorID); + logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); + GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); + } + iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + sendNotification(iMonitorID); + // if the job is completed we do not have to put the job to the queue again + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + } + + cleanup(take); + + + for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) { + MonitorID iMonitorID = iterator.next(); + if (iMonitorID.getFailedCount() > FAILED_COUNT) { + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + String outputDir = iMonitorID.getJobExecutionContext().getOutputDir(); + List<String> stdOut = null; + try { + stdOut = connection.getCluster().listDirectory(outputDir); // check the outputs directory + } catch (SSHApiException e) { + if (e.getMessage().contains("No such file or directory")) { + // this is because while we run output handler something failed and during exception + // we store all the jobs in the monitor queue again + logger.error("We know this job is already attempted to run out-handlers"); +// CommonUtils.removeMonitorFromQueue(queue, iMonitorID); + } + } + if (stdOut != null && stdOut.size() > 0 && !stdOut.get(0).isEmpty()) { // have to be careful with this + iMonitorID.setStatus(JobState.COMPLETE); + logger.errorId(iMonitorID.getJobID(), "Job monitoring failed {} times, " + + " Experiment {} , task {}", iMonitorID.getFailedCount(), + iMonitorID.getExperimentID(), iMonitorID.getTaskID()); + logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); + sendNotification(iMonitorID); +// CommonUtils.removeMonitorFromQueue(take, iMonitorID); + removeList.add(iMonitorID); + GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); + } else { + iMonitorID.setFailedCount(0); + } + } else { + // Evey + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + // if the job is complete we remove it from the Map, if any of these maps + // get empty this userMonitorData will get delete from the queue + } + } + + cleanup(take); + + + } else { + logger.debug("Qstat Monitor doesn't handle non-gsissh hosts , host {}", iHostMonitorData. + getComputeResourceDescription().getHostName()); + } + } + // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back + // now the userMonitorData goes back to the tail of the queue + // during individual monitorID removal we remove the HostMonitorData object if it become empty + // so if all the jobs are finished for all the hostMOnitorId objects in userMonitorData object + // we should remove it from the queue so here we do not put it back. + for (ListIterator<HostMonitorData> iterator1 = take.getHostMonitorData().listIterator(); iterator1.hasNext(); ) { + HostMonitorData iHostMonitorID = iterator1.next(); + if (iHostMonitorID.getMonitorIDs().size() == 0) { + iterator1.remove(); + logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getComputeResourceDescription().getHostName()); + } + } + if(take.getHostMonitorData().size()!=0) { + queue.put(take); + } + } catch (InterruptedException e) { + if (!this.queue.contains(take)) { + try { + this.queue.put(take); + } catch (InterruptedException e1) { + e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID()); + throw new AiravataMonitorException(e); + } catch (SSHApiException e) { + logger.error(e.getMessage()); + if (e.getMessage().contains("Unknown Job Id Error")) { + // in this case job is finished or may be the given job ID is wrong + jobStatus.setState(JobState.UNKNOWN); + JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN"); + if (currentMonitorID != null){ + jobIdentifier.setExperimentId(currentMonitorID.getExperimentID()); + jobIdentifier.setTaskId(currentMonitorID.getTaskID()); + jobIdentifier.setWorkflowNodeId(currentMonitorID.getWorkflowNodeID()); + jobIdentifier.setJobId(currentMonitorID.getJobID()); + jobIdentifier.setGatewayId(currentMonitorID.getJobExecutionContext().getGatewayID()); + } + jobStatus.setJobIdentity(jobIdentifier); + publisher.publish(jobStatus); + } else if (e.getMessage().contains("illegally formed job identifier")) { + logger.error("Wrong job ID is given so dropping the job from monitoring system"); + } else if (!this.queue.contains(take)) { + try { + queue.put(take); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + throw new AiravataMonitorException("Error retrieving the job status", e); + } catch (Exception e) { + try { + queue.put(take); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + throw new AiravataMonitorException("Error retrieving the job status", e); + } + return true; + } + + private void sendNotification(MonitorID iMonitorID) { + JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent(); + JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(), + iMonitorID.getTaskID(), + iMonitorID.getWorkflowNodeID(), + iMonitorID.getExperimentID(), + iMonitorID.getJobExecutionContext().getGatewayID()); + jobStatus.setJobIdentity(jobIdentity); + jobStatus.setState(iMonitorID.getStatus()); + // we have this JobStatus class to handle amqp monitoring + logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " + + "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(), + jobStatus.getJobIdentity().getTaskId()); + + publisher.publish(jobStatus); + } + + /** + * This is the method to stop the polling process + * + * @return if the stopping process is successful return true else false + */ + public boolean stopPulling() { + this.startPulling = false; + return true; + } + + public MonitorPublisher getPublisher() { + return publisher; + } + + public void setPublisher(MonitorPublisher publisher) { + this.publisher = publisher; + } + + public BlockingQueue<UserMonitorData> getQueue() { + return queue; + } + + public void setQueue(BlockingQueue<UserMonitorData> queue) { + this.queue = queue; + } + + public boolean authenticate() { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public Map<String, ResourceConnection> getConnections() { + return connections; + } + + public boolean isStartPulling() { + return startPulling; + } + + public void setConnections(Map<String, ResourceConnection> connections) { + this.connections = connections; + } + + public void setStartPulling(boolean startPulling) { + this.startPulling = startPulling; + } + + public GFac getGfac() { + return gfac; + } + + public void setGfac(GFac gfac) { + this.gfac = gfac; + } + + public AuthenticationInfo getAuthenticationInfo() { + return authenticationInfo; + } + + public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) { + this.authenticationInfo = authenticationInfo; + } + + public LinkedBlockingQueue<String> getCancelJobList() { + return cancelJobList; + } + + public void setCancelJobList(LinkedBlockingQueue<String> cancelJobList) { + this.cancelJobList = cancelJobList; + } + + + private void cleanup(UserMonitorData userMonitorData){ + for(MonitorID iMonitorId:removeList){ + try { + CommonUtils.removeMonitorFromQueue(userMonitorData, iMonitorId); + } catch (AiravataMonitorException e) { + logger.error(e.getMessage(), e); + logger.error("Error deleting the monitor data: " + iMonitorId.getJobID()); + } + } + removeList.clear(); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java new file mode 100644 index 0000000..41e9bd2 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.pull.qstat; + +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.SecurityContext; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; +import org.apache.airavata.gfac.monitor.HostMonitorData; +import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +import org.apache.airavata.gfac.ssh.api.SSHApiException; +import org.apache.airavata.gfac.ssh.api.authentication.AuthenticationInfo; +import org.apache.airavata.gfac.ssh.impl.JobStatus; +import org.apache.airavata.gfac.ssh.impl.PBSCluster; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + + +public class ResourceConnection { + private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class); + + private PBSCluster cluster; + + private AuthenticationInfo authenticationInfo; + + + public ResourceConnection(HostMonitorData hostMonitorData,AuthenticationInfo authInfo) throws SSHApiException { + MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0); + try { + SecurityContext securityContext = monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName()); + if(securityContext != null) { + if (securityContext instanceof GSISecurityContext) { + GSISecurityContext gsiSecurityContext = (GSISecurityContext) securityContext; + cluster = (PBSCluster) gsiSecurityContext.getPbsCluster(); + } else if (securityContext instanceof SSHSecurityContext) { + SSHSecurityContext sshSecurityContext = (SSHSecurityContext) + securityContext; + cluster = (PBSCluster) sshSecurityContext.getPbsCluster(); + } + } + // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring + // we are using our own credentials and not using one users account to do everything. + authenticationInfo = authInfo; + } catch (GFacException e) { + log.error("Error reading data from job ExecutionContext"); + } + } + + public ResourceConnection(HostMonitorData hostMonitorData) throws SSHApiException { + MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0); + try { + GSISecurityContext securityContext = (GSISecurityContext) + monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName()); + cluster = (PBSCluster) securityContext.getPbsCluster(); + + // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring + // we are using our own credentials and not using one users account to do everything. + cluster = new PBSCluster(cluster.getServerInfo(), authenticationInfo, cluster.getJobManagerConfiguration()); + } catch (GFacException e) { + log.error("Error reading data from job ExecutionContext"); + } + } + + public JobState getJobStatus(MonitorID monitorID) throws SSHApiException { + String jobID = monitorID.getJobID(); + //todo so currently we execute the qstat for each job but we can use user based monitoring + //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response + return getStatusFromString(cluster.getJobStatus(jobID).toString()); + } + + public Map<String, JobState> getJobStatuses(List<MonitorID> monitorIDs) throws SSHApiException { + Map<String, JobStatus> treeMap = new TreeMap<String, JobStatus>(); + Map<String, JobState> treeMap1 = new TreeMap<String, JobState>(); + // creating a sorted map with all the jobIds and with the predefined + // status as UNKNOWN + for (MonitorID monitorID : monitorIDs) { + treeMap.put(monitorID.getJobID()+","+monitorID.getJobName(), JobStatus.U); + } + String userName = cluster.getServerInfo().getUserName(); + //todo so currently we execute the qstat for each job but we can use user based monitoring + //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response + // + cluster.getJobStatuses(userName, treeMap); + for (String key : treeMap.keySet()) { + treeMap1.put(key, getStatusFromString(treeMap.get(key).toString())); + } + return treeMap1; + } + + private JobState getStatusFromString(String status) { + log.info("parsing the job status returned : " + status); + if (status != null) { + if ("C".equals(status) || "CD".equals(status) || "E".equals(status) || "CG".equals(status) || "DONE".equals(status)) { + return JobState.COMPLETE; + } else if ("H".equals(status) || "h".equals(status)) { + return JobState.HELD; + } else if ("Q".equals(status) || "qw".equals(status) || "PEND".equals(status)) { + return JobState.QUEUED; + } else if ("R".equals(status) || "CF".equals(status) || "r".equals(status) || "RUN".equals(status)) { + return JobState.ACTIVE; + } else if ("T".equals(status)) { + return JobState.HELD; + } else if ("W".equals(status) || "PD".equals(status)) { + return JobState.QUEUED; + } else if ("S".equals(status) || "PSUSP".equals(status) || "USUSP".equals(status) || "SSUSP".equals(status)) { + return JobState.SUSPENDED; + } else if ("CA".equals(status)) { + return JobState.CANCELED; + } else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status) || "EXIT".equals(status)) { + return JobState.FAILED; + } else if ("PR".equals(status) || "Er".equals(status)) { + return JobState.FAILED; + } else if ("U".equals(status) || ("UNKWN".equals(status))) { + return JobState.UNKNOWN; + } + } + return JobState.UNKNOWN; + } + + public PBSCluster getCluster() { + return cluster; + } + + public void setCluster(PBSCluster cluster) { + this.cluster = cluster; + } + + public boolean isConnected(){ + return this.cluster.getSession().isConnected(); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java new file mode 100644 index 0000000..de8cd8c --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java @@ -0,0 +1,280 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.push.amqp; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; + +import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.core.PushMonitor; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil; +import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.messaging.event.JobIdentifier; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; + +/** + * This is the implementation for AMQP based finishQueue, this uses + * rabbitmq client to recieve AMQP based monitoring data from + * mostly excede resources. + */ +public class AMQPMonitor extends PushMonitor { + private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class); + + + /* this will keep all the channels available in the system, we do not create + channels for all the jobs submitted, but we create channels for each user for each + host. + */ + private Map<String, Channel> availableChannels; + + private MonitorPublisher publisher; + + private MonitorPublisher localPublisher; + + private BlockingQueue<MonitorID> runningQueue; + + private BlockingQueue<MonitorID> finishQueue; + + private String connectionName; + + private String proxyPath; + + private List<String> amqpHosts; + + private boolean startRegister; + + public AMQPMonitor(){ + + } + public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue, + BlockingQueue<MonitorID> finishQueue, + String proxyPath,String connectionName,List<String> hosts) { + this.publisher = publisher; + this.runningQueue = runningQueue; // these will be initialized by the MonitorManager + this.finishQueue = finishQueue; // these will be initialized by the MonitorManager + this.availableChannels = new HashMap<String, Channel>(); + this.connectionName = connectionName; + this.proxyPath = proxyPath; + this.amqpHosts = hosts; + this.localPublisher = new MonitorPublisher(new EventBus()); + this.localPublisher.registerListener(this); + } + + public void initialize(String proxyPath, String connectionName, List<String> hosts) { + this.availableChannels = new HashMap<String, Channel>(); + this.connectionName = connectionName; + this.proxyPath = proxyPath; + this.amqpHosts = hosts; + this.localPublisher = new MonitorPublisher(new EventBus()); + this.localPublisher.registerListener(this); + } + + @Override + public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException { + // we subscribe to read user-host based subscription + ComputeResourceDescription computeResourceDescription = monitorID.getComputeResourceDescription(); + if (computeResourceDescription.isSetIpAddresses() && computeResourceDescription.getIpAddresses().size() > 0) { + // we get first ip address for the moment + String hostAddress = computeResourceDescription.getIpAddresses().get(0); + // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it + // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue + String channelID = CommonUtils.getChannelID(monitorID); + if (availableChannels.get(channelID) == null) { + try { + //todo need to fix this rather getting it from a file + Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath); + Channel channel = null; + channel = connection.createChannel(); + availableChannels.put(channelID, channel); + String queueName = channel.queueDeclare().getQueue(); + + BasicConsumer consumer = new + BasicConsumer(new JSONMessageParser(), localPublisher); // here we use local publisher + channel.basicConsume(queueName, true, consumer); + String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress); + // here we queuebind to a particular user in a particular machine + channel.queueBind(queueName, "glue2.computing_activity", filterString); + logger.info("Using filtering string to monitor: " + filterString); + } catch (IOException e) { + logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName()); + } + } + } else { + throw new AiravataMonitorException("Couldn't register monitor for jobId :" + monitorID.getJobID() + + " , ComputeResourceDescription " + computeResourceDescription.getHostName() + " doesn't has an " + + "IpAddress with it"); + } + return true; + } + + public void run() { + // before going to the while true mode we start unregister thread + startRegister = true; // this will be unset by someone else + while (startRegister || !ServerSettings.isStopAllThreads()) { + try { + MonitorID take = runningQueue.take(); + this.registerListener(take); + } catch (AiravataMonitorException e) { // catch any exceptino inside the loop + logger.error(e.getMessage(), e); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } catch (Exception e){ + logger.error(e.getMessage(), e); + } + } + Set<String> strings = availableChannels.keySet(); + for(String key:strings) { + Channel channel = availableChannels.get(key); + try { + channel.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + + @Subscribe + public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException { + Iterator<MonitorID> iterator = finishQueue.iterator(); + MonitorID next = null; + while(iterator.hasNext()){ + next = iterator.next(); + if(next.getJobID().endsWith(monitorID.getJobID())){ + break; + } + } + if(next == null) { + logger.error("Job has removed from the queue, old obsolete message recieved"); + return false; + } + String channelID = CommonUtils.getChannelID(next); + if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) { + finishQueue.remove(next); + + // if this is the last job in the queue at this point with the same username and same host we + // close the channel and close the connection and remove it from availableChannels + if (CommonUtils.isTheLastJobInQueue(finishQueue, next)) { + logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" + + ", incase new job created we do subscribe again"); + Channel channel = availableChannels.get(channelID); + if (channel == null) { + logger.error("Already Unregistered the listener"); + throw new AiravataMonitorException("Already Unregistered the listener"); + } else { + try { + channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next)); + channel.close(); + channel.getConnection().close(); + availableChannels.remove(channelID); + } catch (IOException e) { + logger.error("Error unregistering the listener"); + throw new AiravataMonitorException("Error unregistering the listener"); + } + } + } + } + next.setStatus(monitorID.getStatus()); + JobIdentifier jobIdentity = new JobIdentifier(next.getJobID(), + next.getTaskID(), + next.getWorkflowNodeID(), + next.getExperimentID(), + next.getJobExecutionContext().getGatewayID()); + publisher.publish(new JobStatusChangeEvent(next.getStatus(),jobIdentity)); + return true; + } + @Override + public boolean stopRegister() throws AiravataMonitorException { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public Map<String, Channel> getAvailableChannels() { + return availableChannels; + } + + public void setAvailableChannels(Map<String, Channel> availableChannels) { + this.availableChannels = availableChannels; + } + + public MonitorPublisher getPublisher() { + return publisher; + } + + public void setPublisher(MonitorPublisher publisher) { + this.publisher = publisher; + } + + public BlockingQueue<MonitorID> getRunningQueue() { + return runningQueue; + } + + public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) { + this.runningQueue = runningQueue; + } + + public BlockingQueue<MonitorID> getFinishQueue() { + return finishQueue; + } + + public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) { + this.finishQueue = finishQueue; + } + + public String getProxyPath() { + return proxyPath; + } + + public void setProxyPath(String proxyPath) { + this.proxyPath = proxyPath; + } + + public List<String> getAmqpHosts() { + return amqpHosts; + } + + public void setAmqpHosts(List<String> amqpHosts) { + this.amqpHosts = amqpHosts; + } + + public boolean isStartRegister() { + return startRegister; + } + + public void setStartRegister(boolean startRegister) { + this.startRegister = startRegister; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java new file mode 100644 index 0000000..bd5c625 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.push.amqp; + +import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.core.MessageParser; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.ShutdownSignalException; + +public class BasicConsumer implements Consumer { + private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class); + + private MessageParser parser; + + private MonitorPublisher publisher; + + public BasicConsumer(MessageParser parser, MonitorPublisher publisher) { + this.parser = parser; + this.publisher = publisher; + } + + public void handleCancel(String consumerTag) { + } + + public void handleCancelOk(String consumerTag) { + } + + public void handleConsumeOk(String consumerTag) { + } + + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) { + + logger.debug("job update for: " + envelope.getRoutingKey()); + String message = new String(body); + message = message.replaceAll("(?m)^", " "); + // Here we parse the message and get the job status and push it + // to the Event bus, this will be picked by +// AiravataJobStatusUpdator and store in to registry + + logger.debug("************************************************************"); + logger.debug("AMQP Message recieved \n" + message); + logger.debug("************************************************************"); + try { + String jobID = envelope.getRoutingKey().split("\\.")[0]; + MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null,null); + monitorID.setStatus(parser.parseMessage(message)); + publisher.publish(monitorID); + } catch (AiravataMonitorException e) { + logger.error(e.getMessage(), e); + } + } + + public void handleRecoverOk(String consumerTag) { + } + + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java new file mode 100644 index 0000000..72c77d5 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.push.amqp; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.airavata.ComputingActivity; +import org.apache.airavata.gfac.monitor.core.MessageParser; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +public class JSONMessageParser implements MessageParser { + private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class); + + public JobState parseMessage(String message)throws AiravataMonitorException { + /*todo write a json message parser here*/ + logger.debug(message); + ObjectMapper objectMapper = new ObjectMapper(); + try { + ComputingActivity computingActivity = objectMapper.readValue(message.getBytes(), ComputingActivity.class); + logger.info(computingActivity.getIDFromEndpoint()); + List<String> stateList = computingActivity.getState(); + JobState jobState = null; + for (String aState : stateList) { + jobState = getStatusFromString(aState); + } + // we get the last value of the state array + return jobState; + } catch (IOException e) { + throw new AiravataMonitorException(e); + } + } + +private JobState getStatusFromString(String status) { + logger.info("parsing the job status returned : " + status); + if(status != null){ + if("ipf:finished".equals(status)){ + return JobState.COMPLETE; + }else if("ipf:pending".equals(status)|| "ipf:starting".equals(status)){ + return JobState.QUEUED; + }else if("ipf:running".equals(status) || "ipf:finishing".equals(status)){ + return JobState.ACTIVE; + }else if ("ipf:held".equals(status) || "ipf:teminating".equals(status) || "ipf:teminated".equals(status)) { + return JobState.HELD; + } else if ("ipf:suspending".equals(status)) { + return JobState.SUSPENDED; + }else if ("ipf:failed".equals(status)) { + return JobState.FAILED; + }else if ("ipf:unknown".equals(status)){ + return JobState.UNKNOWN; + } + } + return JobState.UNKNOWN; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java new file mode 100644 index 0000000..c4275f1 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.push.amqp; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.QueueingConsumer; +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.ServerSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class SimpleJobFinishConsumer { + private final static Logger logger = LoggerFactory.getLogger(SimpleJobFinishConsumer.class); + + private List<String> completedJobsFromPush; + + public SimpleJobFinishConsumer(List<String> completedJobsFromPush) { + this.completedJobsFromPush = completedJobsFromPush; + } + + public void listen() { + try { + String queueName = ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950"); + String uri = "amqp://localhost"; + + ConnectionFactory connFactory = new ConnectionFactory(); + connFactory.setUri(uri); + Connection conn = connFactory.newConnection(); + logger.info("--------Created the connection to Rabbitmq server successfully-------"); + + final Channel ch = conn.createChannel(); + + logger.info("--------Created the channel with Rabbitmq server successfully-------"); + + ch.queueDeclare(queueName, false, false, false, null); + + logger.info("--------Declare the queue " + queueName + " in Rabbitmq server successfully-------"); + + final QueueingConsumer consumer = new QueueingConsumer(ch); + ch.basicConsume(queueName, consumer); + (new Thread() { + public void run() { + try { + while (true) { + QueueingConsumer.Delivery delivery = consumer.nextDelivery(); + String message = new String(delivery.getBody()); + logger.info("---------------- Job Finish message received:" + message + " --------------"); + synchronized (completedJobsFromPush) { + completedJobsFromPush.add(message); + } + ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + } + } catch (Exception ex) { + logger.error("--------Cannot connect to a RabbitMQ Server--------" , ex); + } + } + + }).start(); + } catch (Exception ex) { + logger.error("Cannot connect to a RabbitMQ Server: " , ex); + logger.info("------------- Push monitoring for HPC jobs is disabled -------------"); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java new file mode 100644 index 0000000..a701326 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.push.amqp; + +import com.google.common.eventbus.Subscribe; +import com.rabbitmq.client.Channel; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +public class UnRegisterWorker{ + private final static Logger logger = LoggerFactory.getLogger(UnRegisterWorker.class); + private Map<String, Channel> availableChannels; + + public UnRegisterWorker(Map<String, Channel> channels) { + this.availableChannels = channels; + } + + @Subscribe + private boolean unRegisterListener(JobStatusChangeEvent jobStatus, MonitorID monitorID) throws AiravataMonitorException { + String channelID = CommonUtils.getChannelID(monitorID); + if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){ + Channel channel = availableChannels.get(channelID); + if (channel == null) { + logger.error("Already Unregistered the listener"); + throw new AiravataMonitorException("Already Unregistered the listener"); + } else { + try { + channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID)); + channel.close(); + channel.getConnection().close(); + availableChannels.remove(channelID); + } catch (IOException e) { + logger.error("Error unregistering the listener"); + throw new AiravataMonitorException("Error unregistering the listener"); + } + } + } + return true; + } +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java new file mode 100644 index 0000000..6a4ed3b --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.util; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultSaslConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.security.KeyStore; +import java.util.Collections; +import java.util.List; + +public class AMQPConnectionUtil { + private final static Logger logger = LoggerFactory.getLogger(AMQPConnectionUtil.class); + public static Connection connect(List<String>hosts,String vhost, String proxyFile) { + Collections.shuffle(hosts); + for (String host : hosts) { + Connection connection = connect(host, vhost, proxyFile); + if (host != null) { + System.out.println("connected to " + host); + return connection; + } + } + return null; + } + + public static Connection connect(String host, String vhost, String proxyFile) { + Connection connection; + try { + String keyPassPhrase = "test123"; + KeyStore ks = X509Helper.keyStoreFromPEM(proxyFile, keyPassPhrase); + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, keyPassPhrase.toCharArray()); + + KeyStore tks = X509Helper.trustKeyStoreFromCertDir(); + TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); + tmf.init(tks); + + SSLContext c = SSLContext.getInstance("SSLv3"); + c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + factory.setPort(5671); + factory.useSslProtocol(c); + factory.setVirtualHost(vhost); + factory.setSaslConfig(DefaultSaslConfig.EXTERNAL); + + connection = factory.newConnection(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + return null; + } + return connection; + } + +}
