http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java deleted file mode 100644 index a003f55..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.core; - -import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; -import org.apache.airavata.model.workspace.experiment.JobState; - -/** - * This is an interface to implement messageparser, it could be - * pull based or push based still monitor has to parse the content of - * the message it gets from remote monitoring system and finalize - * them to internal job state, Ex: JSON parser for AMQP and Qstat reader - * for pull based monitor. - */ -public interface MessageParser { - /** - * This method is to implement how to parse the incoming message - * and implement a logic to finalize the status of the job, - * we have to makesure the correct message is given to the messageparser - * parse method, it will not do any filtering - * @param message content of the message - * @return - */ - JobState parseMessage(String message)throws AiravataMonitorException; -}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java deleted file mode 100644 index 614d606..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.core; - - -/** - * This is the primary interface for Monitors, - * This can be used to implement different methods of monitoring - */ -public interface Monitor { - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java deleted file mode 100644 index efdf89c..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.core; - -import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; - -/** - * PullMonitors can implement this interface - * Since the pull and push based monitoring required different - * operations, PullMonitor will be useful. - * This will allow users to program Pull monitors separately - */ -public abstract class PullMonitor extends AiravataAbstractMonitor { - - private int pollingFrequence; - /** - * This method will can invoke when PullMonitor needs to start - * and it has to invoke in the frequency specified below, - * @return if the start process is successful return true else false - */ - public abstract boolean startPulling() throws AiravataMonitorException; - - /** - * This is the method to stop the polling process - * @return if the stopping process is successful return true else false - */ - public abstract boolean stopPulling()throws AiravataMonitorException; - - /** - * this method can be used to set the polling frequencey or otherwise - * can implement a polling mechanism, and implement how to do - * @param frequence - */ - public void setPollingFrequence(int frequence){ - this.pollingFrequence = frequence; - } - - /** - * this method can be used to get the polling frequencey or otherwise - * can implement a polling mechanism, and implement how to do - * @return - */ - public int getPollingFrequence(){ - return this.pollingFrequence; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java deleted file mode 100644 index 1b6a228..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.core; - -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; - -/** - * PushMonitors can implement this interface - * Since the pull and push based monitoring required different - * operations, PullMonitor will be useful. - * This interface will allow users to program Push monitors separately - */ -public abstract class PushMonitor extends AiravataAbstractMonitor { - /** - * This method can be invoked to register a listener with the - * remote monitoring system, ideally inside this method users will be - * writing some client listener code for the remote monitoring system, - * this will be a simple wrapper around any client for the remote Monitor. - * @param monitorID - * @return - */ - public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException; - - /** - * This method can be invoked to unregister a listener with the - * remote monitoring system, ideally inside this method users will be - * writing some client listener code for the remote monitoring system, - * this will be a simple wrapper around any client for the remote Monitor. - * @param monitorID - * @return - */ - public abstract boolean unRegisterListener(MonitorID monitorID)throws AiravataMonitorException; - - /** - * This can be used to stop the registration thread - * @return - * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException - */ - public abstract boolean stopRegister()throws AiravataMonitorException; - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java deleted file mode 100644 index 3acef66..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.exception; - -public class AiravataMonitorException extends Exception { - private static final long serialVersionUID = -2849422320139467602L; - - public AiravataMonitorException(Throwable e) { - super(e); - } - - public AiravataMonitorException(String message) { - super(message, null); - } - - public AiravataMonitorException(String message, Throwable e) { - super(message, e); - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java deleted file mode 100644 index 10192b7..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.handlers; - -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.logger.AiravataLogger; -import org.apache.airavata.common.logger.AiravataLoggerFactory; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.gfac.core.handler.ThreadedHandler; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.core.utils.GFacUtils; -import org.apache.airavata.gfac.monitor.HPCMonitorID; -import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; -import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor; -import org.apache.airavata.gfac.monitor.util.CommonUtils; -import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; -import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; - -import java.io.File; -import java.util.Properties; - -/** - * this handler is responsible for monitoring jobs in a pull mode - * and currently this support multiple pull monitoring in grid resource and uses - * commands like qstat,squeue and this supports sun grid enging monitoring too - * which is a slight variation of qstat monitoring. - */ -public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{ - private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GridPullMonitorHandler.class); - - private HPCPullMonitor hpcPullMonitor; - - private AuthenticationInfo authenticationInfo; - - public void initProperties(Properties properties) throws GFacHandlerException { - String myProxyUser = null; - try { - myProxyUser = ServerSettings.getSetting("myproxy.username"); - String myProxyPass = ServerSettings.getSetting("myproxy.password"); - String certPath = ServerSettings.getSetting("trusted.cert.location"); - String myProxyServer = ServerSettings.getSetting("myproxy.server"); - setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer, - 7512, 17280000, certPath)); - hpcPullMonitor = new HPCPullMonitor(null,getAuthenticationInfo()); // we use our own credentials for monitoring, not from the store - } catch (ApplicationSettingsException e) { - logger.error("Error while reading server properties", e); - throw new GFacHandlerException("Error while reading server properties", e); - } - } - - public void run() { - hpcPullMonitor.run(); - } - - public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - super.invoke(jobExecutionContext); - hpcPullMonitor.setGfac(jobExecutionContext.getGfac()); - hpcPullMonitor.setPublisher(jobExecutionContext.getMonitorPublisher()); - MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext); - try { - /* ZooKeeper zk = jobExecutionContext.getZk(); - try { - String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk); - String path = experimentEntry + File.separator + "operation"; - Stat exists = zk.exists(path, this); - if (exists != null) { - zk.getData(path, this, exists); // watching the operations node - } - } catch (KeeperException e) { - logger.error(e.getMessage(), e); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - }*/ - CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID, jobExecutionContext); - CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper - } catch (AiravataMonitorException e) { - logger.errorId(monitorID.getJobID(), "Error adding job {} monitorID object to the queue with experiment {}", - monitorID.getJobID(), monitorID.getExperimentID()); - } - } - - @Override - public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - // TODO: Auto generated method body. - } - - public AuthenticationInfo getAuthenticationInfo() { - return authenticationInfo; - } - - public HPCPullMonitor getHpcPullMonitor() { - return hpcPullMonitor; - } - - public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) { - this.authenticationInfo = authenticationInfo; - } - - public void setHpcPullMonitor(HPCPullMonitor hpcPullMonitor) { - this.hpcPullMonitor = hpcPullMonitor; - } - - - public void process(WatchedEvent watchedEvent) { - logger.info(watchedEvent.getPath()); - if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){ - // node data is changed, this means node is cancelled. - logger.info("Experiment is cancelled with this path:"+watchedEvent.getPath()); - - String[] split = watchedEvent.getPath().split("/"); - for(String element:split) { - if (element.contains("+")) { - logger.info("Adding experimentID+TaskID to be removed from monitoring:"+element); - hpcPullMonitor.getCancelJobList().add(element); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java deleted file mode 100644 index 8b445df..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.handlers; - -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.gfac.core.handler.ThreadedHandler; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.monitor.HPCMonitorID; -import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor; -import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; -import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * this handler is responsible monitoring jobs in push mode - * and currently this support multiple push monitoring in grid resource - */ -public class GridPushMonitorHandler extends ThreadedHandler { - private final static Logger logger= LoggerFactory.getLogger(GridPushMonitorHandler.class); - - private AMQPMonitor amqpMonitor; - - private AuthenticationInfo authenticationInfo; - - @Override - public void initProperties(Properties properties) throws GFacHandlerException { - String myProxyUser=null; - try{ - myProxyUser = ServerSettings.getSetting("myproxy.username"); - String myProxyPass = ServerSettings.getSetting("myproxy.password"); - String certPath = ServerSettings.getSetting("trusted.cert.location"); - String myProxyServer = ServerSettings.getSetting("myproxy.server"); - setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer, - 7512, 17280000, certPath)); - - String hostList=(String)properties.get("hosts"); - String proxyFilePath = ServerSettings.getSetting("proxy.file.path"); - String connectionName=ServerSettings.getSetting("connection.name"); - LinkedBlockingQueue<MonitorID> pushQueue = new LinkedBlockingQueue<MonitorID>(); - LinkedBlockingQueue<MonitorID> finishQueue = new LinkedBlockingQueue<MonitorID>(); - List<String> hosts= Arrays.asList(hostList.split(",")); - amqpMonitor=new AMQPMonitor(null,pushQueue,finishQueue,proxyFilePath,connectionName,hosts); - }catch (ApplicationSettingsException e){ - logger.error(e.getMessage(), e); - throw new GFacHandlerException(e.getMessage(), e); - } - } - - @Override - public void run() { - amqpMonitor.run(); - } - - public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException{ - super.invoke(jobExecutionContext); - MonitorID monitorID=new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext); - amqpMonitor.getRunningQueue().add(monitorID); - } - - @Override - public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - // TODO: Auto generated method body. - } - - public AMQPMonitor getAmqpMonitor() { - return amqpMonitor; - } - - public void setAmqpMonitor(AMQPMonitor amqpMonitor) { - this.amqpMonitor = amqpMonitor; - } - - public AuthenticationInfo getAuthenticationInfo() { - return authenticationInfo; - } - - public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) { - this.authenticationInfo = authenticationInfo; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java deleted file mode 100644 index 3442367..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ /dev/null @@ -1,471 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.impl.pull.qstat; - -import com.google.common.eventbus.EventBus; -import org.apache.airavata.common.logger.AiravataLogger; -import org.apache.airavata.common.logger.AiravataLoggerFactory; -import org.apache.airavata.common.utils.MonitorPublisher; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.gfac.monitor.util.CommonUtils; -import org.apache.airavata.gfac.core.cpi.GFac; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor; -import org.apache.airavata.gfac.core.utils.OutHandlerWorker; -import org.apache.airavata.gfac.monitor.HostMonitorData; -import org.apache.airavata.gfac.monitor.UserMonitorData; -import org.apache.airavata.gfac.monitor.core.PullMonitor; -import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; -import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer; -import org.apache.airavata.gsi.ssh.api.SSHApiException; -import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; -import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; -import org.apache.airavata.model.messaging.event.JobIdentifier; -import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; -import org.apache.airavata.model.workspace.experiment.JobState; - -import java.sql.Timestamp; -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * This monitor is based on qstat command which can be run - * in grid resources and retrieve the job status. - */ -public class HPCPullMonitor extends PullMonitor { - - private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(HPCPullMonitor.class); - public static final int FAILED_COUNT = 5; - - // I think this should use DelayedBlocking Queue to do the monitoring*/ - private BlockingQueue<UserMonitorData> queue; - - private boolean startPulling = false; - - private Map<String, ResourceConnection> connections; - - private MonitorPublisher publisher; - - private LinkedBlockingQueue<String> cancelJobList; - - private List<String> completedJobsFromPush; - - private GFac gfac; - - private AuthenticationInfo authenticationInfo; - - private ArrayList<MonitorID> removeList; - - public HPCPullMonitor() { - connections = new HashMap<String, ResourceConnection>(); - queue = new LinkedBlockingDeque<UserMonitorData>(); - publisher = new MonitorPublisher(new EventBus()); - cancelJobList = new LinkedBlockingQueue<String>(); - completedJobsFromPush = new ArrayList<String>(); - (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen(); - removeList = new ArrayList<MonitorID>(); - } - - public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) { - connections = new HashMap<String, ResourceConnection>(); - queue = new LinkedBlockingDeque<UserMonitorData>(); - publisher = monitorPublisher; - authenticationInfo = authInfo; - cancelJobList = new LinkedBlockingQueue<String>(); - this.completedJobsFromPush = new ArrayList<String>(); - (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen(); - removeList = new ArrayList<MonitorID>(); - } - - public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) { - this.queue = queue; - this.publisher = publisher; - connections = new HashMap<String, ResourceConnection>(); - cancelJobList = new LinkedBlockingQueue<String>(); - this.completedJobsFromPush = new ArrayList<String>(); - (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen(); - removeList = new ArrayList<MonitorID>(); - } - - - public void run() { - /* implement a logic to pick each monitorID object from the queue and do the - monitoring - */ - this.startPulling = true; - while (this.startPulling && !ServerSettings.isStopAllThreads()) { - try { - // After finishing one iteration of the full queue this thread sleeps 1 second - synchronized (this.queue) { - if (this.queue.size() > 0) { - startPulling(); - } - } - Thread.sleep(10000); - } catch (Exception e) { - // we catch all the exceptions here because no matter what happens we do not stop running this - // thread, but ideally we should report proper error messages, but this is handled in startPulling - // method, incase something happen in Thread.sleep we handle it with this catch block. - logger.error(e.getMessage(),e); - } - } - // thread is going to return so we close all the connections - Iterator<String> iterator = connections.keySet().iterator(); - while (iterator.hasNext()) { - String next = iterator.next(); - ResourceConnection resourceConnection = connections.get(next); - try { - resourceConnection.getCluster().disconnect(); - } catch (SSHApiException e) { - logger.error("Erro while connecting to the cluster", e); - } - } - } - - /** - * This method will can invoke when PullMonitor needs to start - * and it has to invoke in the frequency specified below, - * - * @return if the start process is successful return true else false - */ - public boolean startPulling() throws AiravataMonitorException { - // take the top element in the queue and pull the data and put that element - // at the tail of the queue - //todo this polling will not work with multiple usernames but with single user - // and multiple hosts, currently monitoring will work - UserMonitorData take = null; - JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent(); - MonitorID currentMonitorID = null; - try { - take = this.queue.take(); - List<HostMonitorData> hostMonitorData = take.getHostMonitorData(); - for (ListIterator<HostMonitorData> hostIterator = hostMonitorData.listIterator(); hostIterator.hasNext();) { - HostMonitorData iHostMonitorData = hostIterator.next(); - if (iHostMonitorData.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { - String hostName = iHostMonitorData.getComputeResourceDescription().getHostName(); - ResourceConnection connection = null; - if (connections.containsKey(hostName)) { - if (!connections.get(hostName).isConnected()) { - connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo()); - connections.put(hostName, connection); - } else { - logger.debug("We already have this connection so not going to create one"); - connection = connections.get(hostName); - } - } else { - connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo()); - connections.put(hostName, connection); - } - - // before we get the statuses, we check the cancel job list and remove them permanently - List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs(); - Iterator<String> iterator1 = cancelJobList.iterator(); - ListIterator<MonitorID> monitorIDListIterator = monitorID.listIterator(); - while (monitorIDListIterator.hasNext()) { - MonitorID iMonitorID = monitorIDListIterator.next(); - while (iterator1.hasNext()) { - String cancelMId = iterator1.next(); - if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) { - iMonitorID.setStatus(JobState.CANCELED); -// CommonUtils.removeMonitorFromQueue(take, iMonitorID); - removeList.add(iMonitorID); - logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " + - "completed job queue, experiment {}, task {} , job {}", - iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID()); - logger.info("Job cancelled: marking the Job as ************CANCELLED************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); - sendNotification(iMonitorID); - logger.info("To avoid timing issues we sleep sometime and try to retrieve output files"); - Thread.sleep(10000); - GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); - break; - } - } - iterator1 = cancelJobList.iterator(); - } - - cleanup(take); - - synchronized (completedJobsFromPush) { - for (ListIterator<String> iterator = completedJobsFromPush.listIterator(); iterator.hasNext(); ) { - String completeId = iterator.next(); - for (monitorIDListIterator = monitorID.listIterator(); monitorIDListIterator.hasNext(); ) { - MonitorID iMonitorID = monitorIDListIterator.next(); - if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { - logger.info("This job is finished because push notification came with <username,jobName> " + completeId); - iMonitorID.setStatus(JobState.COMPLETE); -// CommonUtils.removeMonitorFromQueue(take, iMonitorID);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak - removeList.add(iMonitorID); - logger.debugId(completeId, "Push notification updated job {} status to {}. " + - "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(), - iMonitorID.getExperimentID(), iMonitorID.getTaskID()); - logger.info("AMQP message recieved: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); - - sendNotification(iMonitorID); - logger.info("To avoid timing issues we sleep sometime and try to retrieve output files"); - Thread.sleep(10000); - GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); - break; - } - } - } - } - - cleanup(take); - - // we have to get this again because we removed the already completed jobs with amqp messages - monitorID = iHostMonitorData.getMonitorIDs(); - Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID); - for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) { - MonitorID iMonitorID = iterator.next(); - currentMonitorID = iMonitorID; - if (!JobState.CANCELED.equals(iMonitorID.getStatus()) && - !JobState.COMPLETE.equals(iMonitorID.getStatus())) { - iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic - } else if (JobState.COMPLETE.equals(iMonitorID.getStatus())) { - logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " + - "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); -// CommonUtils.removeMonitorFromQueue(take, iMonitorID); - removeList.add(iMonitorID); - logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); - GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); - } - iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic - iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); - sendNotification(iMonitorID); - // if the job is completed we do not have to put the job to the queue again - iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); - } - - cleanup(take); - - - for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) { - MonitorID iMonitorID = iterator.next(); - if (iMonitorID.getFailedCount() > FAILED_COUNT) { - iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); - String outputDir = iMonitorID.getJobExecutionContext().getOutputDir(); - List<String> stdOut = null; - try { - stdOut = connection.getCluster().listDirectory(outputDir); // check the outputs directory - } catch (SSHApiException e) { - if (e.getMessage().contains("No such file or directory")) { - // this is because while we run output handler something failed and during exception - // we store all the jobs in the monitor queue again - logger.error("We know this job is already attempted to run out-handlers"); -// CommonUtils.removeMonitorFromQueue(queue, iMonitorID); - } - } - if (stdOut != null && stdOut.size() > 0 && !stdOut.get(0).isEmpty()) { // have to be careful with this - iMonitorID.setStatus(JobState.COMPLETE); - logger.errorId(iMonitorID.getJobID(), "Job monitoring failed {} times, " + - " Experiment {} , task {}", iMonitorID.getFailedCount(), - iMonitorID.getExperimentID(), iMonitorID.getTaskID()); - logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); - sendNotification(iMonitorID); -// CommonUtils.removeMonitorFromQueue(take, iMonitorID); - removeList.add(iMonitorID); - GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); - } else { - iMonitorID.setFailedCount(0); - } - } else { - // Evey - iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); - // if the job is complete we remove it from the Map, if any of these maps - // get empty this userMonitorData will get delete from the queue - } - } - - cleanup(take); - - - } else { - logger.debug("Qstat Monitor doesn't handle non-gsissh hosts , host {}", iHostMonitorData. - getComputeResourceDescription().getHostName()); - } - } - // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back - // now the userMonitorData goes back to the tail of the queue - // during individual monitorID removal we remove the HostMonitorData object if it become empty - // so if all the jobs are finished for all the hostMOnitorId objects in userMonitorData object - // we should remove it from the queue so here we do not put it back. - for (ListIterator<HostMonitorData> iterator1 = take.getHostMonitorData().listIterator(); iterator1.hasNext(); ) { - HostMonitorData iHostMonitorID = iterator1.next(); - if (iHostMonitorID.getMonitorIDs().size() == 0) { - iterator1.remove(); - logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getComputeResourceDescription().getHostName()); - } - } - if(take.getHostMonitorData().size()!=0) { - queue.put(take); - } - } catch (InterruptedException e) { - if (!this.queue.contains(take)) { - try { - this.queue.put(take); - } catch (InterruptedException e1) { - e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID()); - throw new AiravataMonitorException(e); - } catch (SSHApiException e) { - logger.error(e.getMessage()); - if (e.getMessage().contains("Unknown Job Id Error")) { - // in this case job is finished or may be the given job ID is wrong - jobStatus.setState(JobState.UNKNOWN); - JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN"); - if (currentMonitorID != null){ - jobIdentifier.setExperimentId(currentMonitorID.getExperimentID()); - jobIdentifier.setTaskId(currentMonitorID.getTaskID()); - jobIdentifier.setWorkflowNodeId(currentMonitorID.getWorkflowNodeID()); - jobIdentifier.setJobId(currentMonitorID.getJobID()); - jobIdentifier.setGatewayId(currentMonitorID.getJobExecutionContext().getGatewayID()); - } - jobStatus.setJobIdentity(jobIdentifier); - publisher.publish(jobStatus); - } else if (e.getMessage().contains("illegally formed job identifier")) { - logger.error("Wrong job ID is given so dropping the job from monitoring system"); - } else if (!this.queue.contains(take)) { - try { - queue.put(take); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } - throw new AiravataMonitorException("Error retrieving the job status", e); - } catch (Exception e) { - try { - queue.put(take); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - throw new AiravataMonitorException("Error retrieving the job status", e); - } - return true; - } - - private void sendNotification(MonitorID iMonitorID) { - JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent(); - JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(), - iMonitorID.getTaskID(), - iMonitorID.getWorkflowNodeID(), - iMonitorID.getExperimentID(), - iMonitorID.getJobExecutionContext().getGatewayID()); - jobStatus.setJobIdentity(jobIdentity); - jobStatus.setState(iMonitorID.getStatus()); - // we have this JobStatus class to handle amqp monitoring - logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " + - "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(), - jobStatus.getJobIdentity().getTaskId()); - - publisher.publish(jobStatus); - } - - /** - * This is the method to stop the polling process - * - * @return if the stopping process is successful return true else false - */ - public boolean stopPulling() { - this.startPulling = false; - return true; - } - - public MonitorPublisher getPublisher() { - return publisher; - } - - public void setPublisher(MonitorPublisher publisher) { - this.publisher = publisher; - } - - public BlockingQueue<UserMonitorData> getQueue() { - return queue; - } - - public void setQueue(BlockingQueue<UserMonitorData> queue) { - this.queue = queue; - } - - public boolean authenticate() { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public Map<String, ResourceConnection> getConnections() { - return connections; - } - - public boolean isStartPulling() { - return startPulling; - } - - public void setConnections(Map<String, ResourceConnection> connections) { - this.connections = connections; - } - - public void setStartPulling(boolean startPulling) { - this.startPulling = startPulling; - } - - public GFac getGfac() { - return gfac; - } - - public void setGfac(GFac gfac) { - this.gfac = gfac; - } - - public AuthenticationInfo getAuthenticationInfo() { - return authenticationInfo; - } - - public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) { - this.authenticationInfo = authenticationInfo; - } - - public LinkedBlockingQueue<String> getCancelJobList() { - return cancelJobList; - } - - public void setCancelJobList(LinkedBlockingQueue<String> cancelJobList) { - this.cancelJobList = cancelJobList; - } - - - private void cleanup(UserMonitorData userMonitorData){ - for(MonitorID iMonitorId:removeList){ - try { - CommonUtils.removeMonitorFromQueue(userMonitorData, iMonitorId); - } catch (AiravataMonitorException e) { - logger.error(e.getMessage(), e); - logger.error("Error deleting the monitor data: " + iMonitorId.getJobID()); - } - } - removeList.clear(); - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java deleted file mode 100644 index f718535..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.impl.pull.qstat; - -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.SecurityContext; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; -import org.apache.airavata.gfac.monitor.HostMonitorData; -import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; -import org.apache.airavata.gsi.ssh.api.SSHApiException; -import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; -import org.apache.airavata.gsi.ssh.impl.JobStatus; -import org.apache.airavata.gsi.ssh.impl.PBSCluster; -import org.apache.airavata.model.workspace.experiment.JobState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - - -public class ResourceConnection { - private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class); - - private PBSCluster cluster; - - private AuthenticationInfo authenticationInfo; - - - public ResourceConnection(HostMonitorData hostMonitorData,AuthenticationInfo authInfo) throws SSHApiException { - MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0); - try { - SecurityContext securityContext = monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName()); - if(securityContext != null) { - if (securityContext instanceof GSISecurityContext) { - GSISecurityContext gsiSecurityContext = (GSISecurityContext) securityContext; - cluster = (PBSCluster) gsiSecurityContext.getPbsCluster(); - } else if (securityContext instanceof SSHSecurityContext) { - SSHSecurityContext sshSecurityContext = (SSHSecurityContext) - securityContext; - cluster = (PBSCluster) sshSecurityContext.getPbsCluster(); - } - } - // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring - // we are using our own credentials and not using one users account to do everything. - authenticationInfo = authInfo; - } catch (GFacException e) { - log.error("Error reading data from job ExecutionContext"); - } - } - - public ResourceConnection(HostMonitorData hostMonitorData) throws SSHApiException { - MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0); - try { - GSISecurityContext securityContext = (GSISecurityContext) - monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName()); - cluster = (PBSCluster) securityContext.getPbsCluster(); - - // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring - // we are using our own credentials and not using one users account to do everything. - cluster = new PBSCluster(cluster.getServerInfo(), authenticationInfo, cluster.getJobManagerConfiguration()); - } catch (GFacException e) { - log.error("Error reading data from job ExecutionContext"); - } - } - - public JobState getJobStatus(MonitorID monitorID) throws SSHApiException { - String jobID = monitorID.getJobID(); - //todo so currently we execute the qstat for each job but we can use user based monitoring - //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response - return getStatusFromString(cluster.getJobStatus(jobID).toString()); - } - - public Map<String, JobState> getJobStatuses(List<MonitorID> monitorIDs) throws SSHApiException { - Map<String, JobStatus> treeMap = new TreeMap<String, JobStatus>(); - Map<String, JobState> treeMap1 = new TreeMap<String, JobState>(); - // creating a sorted map with all the jobIds and with the predefined - // status as UNKNOWN - for (MonitorID monitorID : monitorIDs) { - treeMap.put(monitorID.getJobID()+","+monitorID.getJobName(), JobStatus.U); - } - String userName = cluster.getServerInfo().getUserName(); - //todo so currently we execute the qstat for each job but we can use user based monitoring - //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response - // - cluster.getJobStatuses(userName, treeMap); - for (String key : treeMap.keySet()) { - treeMap1.put(key, getStatusFromString(treeMap.get(key).toString())); - } - return treeMap1; - } - - private JobState getStatusFromString(String status) { - log.info("parsing the job status returned : " + status); - if (status != null) { - if ("C".equals(status) || "CD".equals(status) || "E".equals(status) || "CG".equals(status) || "DONE".equals(status)) { - return JobState.COMPLETE; - } else if ("H".equals(status) || "h".equals(status)) { - return JobState.HELD; - } else if ("Q".equals(status) || "qw".equals(status) || "PEND".equals(status)) { - return JobState.QUEUED; - } else if ("R".equals(status) || "CF".equals(status) || "r".equals(status) || "RUN".equals(status)) { - return JobState.ACTIVE; - } else if ("T".equals(status)) { - return JobState.HELD; - } else if ("W".equals(status) || "PD".equals(status)) { - return JobState.QUEUED; - } else if ("S".equals(status) || "PSUSP".equals(status) || "USUSP".equals(status) || "SSUSP".equals(status)) { - return JobState.SUSPENDED; - } else if ("CA".equals(status)) { - return JobState.CANCELED; - } else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status) || "EXIT".equals(status)) { - return JobState.FAILED; - } else if ("PR".equals(status) || "Er".equals(status)) { - return JobState.FAILED; - } else if ("U".equals(status) || ("UNKWN".equals(status))) { - return JobState.UNKNOWN; - } - } - return JobState.UNKNOWN; - } - - public PBSCluster getCluster() { - return cluster; - } - - public void setCluster(PBSCluster cluster) { - this.cluster = cluster; - } - - public boolean isConnected(){ - return this.cluster.getSession().isConnected(); - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java deleted file mode 100644 index de8cd8c..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.impl.push.amqp; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.BlockingQueue; - -import org.apache.airavata.common.utils.MonitorPublisher; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.monitor.core.PushMonitor; -import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; -import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil; -import org.apache.airavata.gfac.monitor.util.CommonUtils; -import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; -import org.apache.airavata.model.messaging.event.JobIdentifier; -import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; -import org.apache.airavata.model.workspace.experiment.JobState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; - -/** - * This is the implementation for AMQP based finishQueue, this uses - * rabbitmq client to recieve AMQP based monitoring data from - * mostly excede resources. - */ -public class AMQPMonitor extends PushMonitor { - private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class); - - - /* this will keep all the channels available in the system, we do not create - channels for all the jobs submitted, but we create channels for each user for each - host. - */ - private Map<String, Channel> availableChannels; - - private MonitorPublisher publisher; - - private MonitorPublisher localPublisher; - - private BlockingQueue<MonitorID> runningQueue; - - private BlockingQueue<MonitorID> finishQueue; - - private String connectionName; - - private String proxyPath; - - private List<String> amqpHosts; - - private boolean startRegister; - - public AMQPMonitor(){ - - } - public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue, - BlockingQueue<MonitorID> finishQueue, - String proxyPath,String connectionName,List<String> hosts) { - this.publisher = publisher; - this.runningQueue = runningQueue; // these will be initialized by the MonitorManager - this.finishQueue = finishQueue; // these will be initialized by the MonitorManager - this.availableChannels = new HashMap<String, Channel>(); - this.connectionName = connectionName; - this.proxyPath = proxyPath; - this.amqpHosts = hosts; - this.localPublisher = new MonitorPublisher(new EventBus()); - this.localPublisher.registerListener(this); - } - - public void initialize(String proxyPath, String connectionName, List<String> hosts) { - this.availableChannels = new HashMap<String, Channel>(); - this.connectionName = connectionName; - this.proxyPath = proxyPath; - this.amqpHosts = hosts; - this.localPublisher = new MonitorPublisher(new EventBus()); - this.localPublisher.registerListener(this); - } - - @Override - public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException { - // we subscribe to read user-host based subscription - ComputeResourceDescription computeResourceDescription = monitorID.getComputeResourceDescription(); - if (computeResourceDescription.isSetIpAddresses() && computeResourceDescription.getIpAddresses().size() > 0) { - // we get first ip address for the moment - String hostAddress = computeResourceDescription.getIpAddresses().get(0); - // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it - // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue - String channelID = CommonUtils.getChannelID(monitorID); - if (availableChannels.get(channelID) == null) { - try { - //todo need to fix this rather getting it from a file - Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath); - Channel channel = null; - channel = connection.createChannel(); - availableChannels.put(channelID, channel); - String queueName = channel.queueDeclare().getQueue(); - - BasicConsumer consumer = new - BasicConsumer(new JSONMessageParser(), localPublisher); // here we use local publisher - channel.basicConsume(queueName, true, consumer); - String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress); - // here we queuebind to a particular user in a particular machine - channel.queueBind(queueName, "glue2.computing_activity", filterString); - logger.info("Using filtering string to monitor: " + filterString); - } catch (IOException e) { - logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName()); - } - } - } else { - throw new AiravataMonitorException("Couldn't register monitor for jobId :" + monitorID.getJobID() + - " , ComputeResourceDescription " + computeResourceDescription.getHostName() + " doesn't has an " + - "IpAddress with it"); - } - return true; - } - - public void run() { - // before going to the while true mode we start unregister thread - startRegister = true; // this will be unset by someone else - while (startRegister || !ServerSettings.isStopAllThreads()) { - try { - MonitorID take = runningQueue.take(); - this.registerListener(take); - } catch (AiravataMonitorException e) { // catch any exceptino inside the loop - logger.error(e.getMessage(), e); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } catch (Exception e){ - logger.error(e.getMessage(), e); - } - } - Set<String> strings = availableChannels.keySet(); - for(String key:strings) { - Channel channel = availableChannels.get(key); - try { - channel.close(); - } catch (IOException e) { - logger.error(e.getMessage(), e); - } - } - } - - @Subscribe - public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException { - Iterator<MonitorID> iterator = finishQueue.iterator(); - MonitorID next = null; - while(iterator.hasNext()){ - next = iterator.next(); - if(next.getJobID().endsWith(monitorID.getJobID())){ - break; - } - } - if(next == null) { - logger.error("Job has removed from the queue, old obsolete message recieved"); - return false; - } - String channelID = CommonUtils.getChannelID(next); - if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) { - finishQueue.remove(next); - - // if this is the last job in the queue at this point with the same username and same host we - // close the channel and close the connection and remove it from availableChannels - if (CommonUtils.isTheLastJobInQueue(finishQueue, next)) { - logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" + - ", incase new job created we do subscribe again"); - Channel channel = availableChannels.get(channelID); - if (channel == null) { - logger.error("Already Unregistered the listener"); - throw new AiravataMonitorException("Already Unregistered the listener"); - } else { - try { - channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next)); - channel.close(); - channel.getConnection().close(); - availableChannels.remove(channelID); - } catch (IOException e) { - logger.error("Error unregistering the listener"); - throw new AiravataMonitorException("Error unregistering the listener"); - } - } - } - } - next.setStatus(monitorID.getStatus()); - JobIdentifier jobIdentity = new JobIdentifier(next.getJobID(), - next.getTaskID(), - next.getWorkflowNodeID(), - next.getExperimentID(), - next.getJobExecutionContext().getGatewayID()); - publisher.publish(new JobStatusChangeEvent(next.getStatus(),jobIdentity)); - return true; - } - @Override - public boolean stopRegister() throws AiravataMonitorException { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public Map<String, Channel> getAvailableChannels() { - return availableChannels; - } - - public void setAvailableChannels(Map<String, Channel> availableChannels) { - this.availableChannels = availableChannels; - } - - public MonitorPublisher getPublisher() { - return publisher; - } - - public void setPublisher(MonitorPublisher publisher) { - this.publisher = publisher; - } - - public BlockingQueue<MonitorID> getRunningQueue() { - return runningQueue; - } - - public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) { - this.runningQueue = runningQueue; - } - - public BlockingQueue<MonitorID> getFinishQueue() { - return finishQueue; - } - - public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) { - this.finishQueue = finishQueue; - } - - public String getProxyPath() { - return proxyPath; - } - - public void setProxyPath(String proxyPath) { - this.proxyPath = proxyPath; - } - - public List<String> getAmqpHosts() { - return amqpHosts; - } - - public void setAmqpHosts(List<String> amqpHosts) { - this.amqpHosts = amqpHosts; - } - - public boolean isStartRegister() { - return startRegister; - } - - public void setStartRegister(boolean startRegister) { - this.startRegister = startRegister; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java deleted file mode 100644 index bd5c625..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.impl.push.amqp; - -import org.apache.airavata.common.utils.MonitorPublisher; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.monitor.core.MessageParser; -import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Consumer; -import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.ShutdownSignalException; - -public class BasicConsumer implements Consumer { - private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class); - - private MessageParser parser; - - private MonitorPublisher publisher; - - public BasicConsumer(MessageParser parser, MonitorPublisher publisher) { - this.parser = parser; - this.publisher = publisher; - } - - public void handleCancel(String consumerTag) { - } - - public void handleCancelOk(String consumerTag) { - } - - public void handleConsumeOk(String consumerTag) { - } - - public void handleDelivery(String consumerTag, - Envelope envelope, - AMQP.BasicProperties properties, - byte[] body) { - - logger.debug("job update for: " + envelope.getRoutingKey()); - String message = new String(body); - message = message.replaceAll("(?m)^", " "); - // Here we parse the message and get the job status and push it - // to the Event bus, this will be picked by -// AiravataJobStatusUpdator and store in to registry - - logger.debug("************************************************************"); - logger.debug("AMQP Message recieved \n" + message); - logger.debug("************************************************************"); - try { - String jobID = envelope.getRoutingKey().split("\\.")[0]; - MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null,null); - monitorID.setStatus(parser.parseMessage(message)); - publisher.publish(monitorID); - } catch (AiravataMonitorException e) { - logger.error(e.getMessage(), e); - } - } - - public void handleRecoverOk(String consumerTag) { - } - - public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java deleted file mode 100644 index 72c77d5..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.impl.push.amqp; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.airavata.ComputingActivity; -import org.apache.airavata.gfac.monitor.core.MessageParser; -import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; -import org.apache.airavata.model.workspace.experiment.JobState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; - -public class JSONMessageParser implements MessageParser { - private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class); - - public JobState parseMessage(String message)throws AiravataMonitorException { - /*todo write a json message parser here*/ - logger.debug(message); - ObjectMapper objectMapper = new ObjectMapper(); - try { - ComputingActivity computingActivity = objectMapper.readValue(message.getBytes(), ComputingActivity.class); - logger.info(computingActivity.getIDFromEndpoint()); - List<String> stateList = computingActivity.getState(); - JobState jobState = null; - for (String aState : stateList) { - jobState = getStatusFromString(aState); - } - // we get the last value of the state array - return jobState; - } catch (IOException e) { - throw new AiravataMonitorException(e); - } - } - -private JobState getStatusFromString(String status) { - logger.info("parsing the job status returned : " + status); - if(status != null){ - if("ipf:finished".equals(status)){ - return JobState.COMPLETE; - }else if("ipf:pending".equals(status)|| "ipf:starting".equals(status)){ - return JobState.QUEUED; - }else if("ipf:running".equals(status) || "ipf:finishing".equals(status)){ - return JobState.ACTIVE; - }else if ("ipf:held".equals(status) || "ipf:teminating".equals(status) || "ipf:teminated".equals(status)) { - return JobState.HELD; - } else if ("ipf:suspending".equals(status)) { - return JobState.SUSPENDED; - }else if ("ipf:failed".equals(status)) { - return JobState.FAILED; - }else if ("ipf:unknown".equals(status)){ - return JobState.UNKNOWN; - } - } - return JobState.UNKNOWN; - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java deleted file mode 100644 index c4275f1..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.impl.push.amqp; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.QueueingConsumer; -import org.apache.airavata.common.utils.Constants; -import org.apache.airavata.common.utils.ServerSettings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class SimpleJobFinishConsumer { - private final static Logger logger = LoggerFactory.getLogger(SimpleJobFinishConsumer.class); - - private List<String> completedJobsFromPush; - - public SimpleJobFinishConsumer(List<String> completedJobsFromPush) { - this.completedJobsFromPush = completedJobsFromPush; - } - - public void listen() { - try { - String queueName = ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950"); - String uri = "amqp://localhost"; - - ConnectionFactory connFactory = new ConnectionFactory(); - connFactory.setUri(uri); - Connection conn = connFactory.newConnection(); - logger.info("--------Created the connection to Rabbitmq server successfully-------"); - - final Channel ch = conn.createChannel(); - - logger.info("--------Created the channel with Rabbitmq server successfully-------"); - - ch.queueDeclare(queueName, false, false, false, null); - - logger.info("--------Declare the queue " + queueName + " in Rabbitmq server successfully-------"); - - final QueueingConsumer consumer = new QueueingConsumer(ch); - ch.basicConsume(queueName, consumer); - (new Thread() { - public void run() { - try { - while (true) { - QueueingConsumer.Delivery delivery = consumer.nextDelivery(); - String message = new String(delivery.getBody()); - logger.info("---------------- Job Finish message received:" + message + " --------------"); - synchronized (completedJobsFromPush) { - completedJobsFromPush.add(message); - } - ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false); - } - } catch (Exception ex) { - logger.error("--------Cannot connect to a RabbitMQ Server--------" , ex); - } - } - - }).start(); - } catch (Exception ex) { - logger.error("Cannot connect to a RabbitMQ Server: " , ex); - logger.info("------------- Push monitoring for HPC jobs is disabled -------------"); - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java deleted file mode 100644 index a701326..0000000 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.impl.push.amqp; - -import com.google.common.eventbus.Subscribe; -import com.rabbitmq.client.Channel; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; -import org.apache.airavata.gfac.monitor.util.CommonUtils; -import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; -import org.apache.airavata.model.workspace.experiment.JobState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Map; - -public class UnRegisterWorker{ - private final static Logger logger = LoggerFactory.getLogger(UnRegisterWorker.class); - private Map<String, Channel> availableChannels; - - public UnRegisterWorker(Map<String, Channel> channels) { - this.availableChannels = channels; - } - - @Subscribe - private boolean unRegisterListener(JobStatusChangeEvent jobStatus, MonitorID monitorID) throws AiravataMonitorException { - String channelID = CommonUtils.getChannelID(monitorID); - if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){ - Channel channel = availableChannels.get(channelID); - if (channel == null) { - logger.error("Already Unregistered the listener"); - throw new AiravataMonitorException("Already Unregistered the listener"); - } else { - try { - channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID)); - channel.close(); - channel.getConnection().close(); - availableChannels.remove(channelID); - } catch (IOException e) { - logger.error("Error unregistering the listener"); - throw new AiravataMonitorException("Error unregistering the listener"); - } - } - } - return true; - } -} -
