Repository: airavata Updated Branches: refs/heads/master 0a470ce90 -> 22cd1a091
committing more changes with orchestrator-registry integration - AIRAVATA-1028 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3f8457a7 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3f8457a7 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3f8457a7 Branch: refs/heads/master Commit: 3f8457a73cac42f923bdfc2bedf7925b4b09cdfd Parents: a4c0adc Author: lahiru <[email protected]> Authored: Mon Mar 3 12:05:35 2014 -0500 Committer: lahiru <[email protected]> Committed: Mon Mar 3 12:05:35 2014 -0500 ---------------------------------------------------------------------- .../job/monitor/AiravataJobStatusUpdator.java | 4 +- .../apache/airavata/job/monitor/MonitorID.java | 10 +++ .../monitor/impl/pull/qstat/QstatMonitor.java | 75 ++++++++++++------- .../main/resources/schemas/HostDescription.xsd | 1 + .../main/resources/airavata-server.properties | 2 +- .../src/main/resources/monitor.properties | 2 +- .../client/OrchestratorClientFactoryTest.java | 79 ++++++++++++-------- 7 files changed, 110 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java index f7afc4d..0a0fde5 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java @@ -66,11 +66,11 @@ public class AiravataJobStatusUpdator{ JobState state = jobStatus.getState(); switch (state) { case COMPLETE: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is DONE"); + logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is DONE"); jobsToMonitor.remove(jobStatus.getMonitorID()); break; case UNKNOWN: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is UNKNOWN"); + logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is UNKNOWN"); logger.info("Unknown job status came, if the old job status is RUNNING or something active, we have to make it complete"); //todo implement this logic break; http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java index 06d04ac..f33d348 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java @@ -58,6 +58,8 @@ public class MonitorID { private String taskID; + private int failedCount = 0; + public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName) { this.host = host; @@ -170,4 +172,12 @@ public class MonitorID { public void setTaskID(String taskID) { this.taskID = taskID; } + + public int getFailedCount() { + return failedCount; + } + + public void setFailedCount(int failedCount) { + this.failedCount = failedCount; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java index eba99f9..8f0b79d 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java @@ -20,6 +20,7 @@ */ package org.apache.airavata.job.monitor.impl.pull.qstat; +import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.job.monitor.core.PullMonitor; @@ -27,6 +28,8 @@ import org.apache.airavata.job.monitor.event.MonitorPublisher; import org.apache.airavata.job.monitor.exception.AiravataMonitorException; import org.apache.airavata.job.monitor.state.JobStatus; import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.schemas.gfac.GsisshHostType; +import org.apache.airavata.schemas.gfac.HostDescriptionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,10 +74,11 @@ public class QstatMonitor extends PullMonitor implements Runnable { startPulling(); // After finishing one iteration of the full queue this thread sleeps 1 second Thread.sleep(1000); - } catch (AiravataMonitorException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (InterruptedException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (Exception e){ + // we catch all the exceptions here because no matter what happens we do not stop running this + // thread, but ideally we should report proper error messages, but this is handled in startPulling + // method, incase something happen in Thread.sleep we handle it with this catch block. + logger.error(e.getMessage()); } } } @@ -106,24 +110,26 @@ public class QstatMonitor extends PullMonitor implements Runnable { this.queue.put(take); } } - if(take.getLastMonitored() == null || ((monitorDiff/1000) >= 5)){ - String hostName = take.getHost().getType().getHostAddress(); - ResourceConnection connection = null; - if (connections.containsKey(hostName)) { - logger.debug("We already have this connection so not going to create one"); - connection = connections.get(hostName); - } else { - connection = new ResourceConnection(take, "/opt/torque/bin"); - } - jobStatus.setMonitorID(take); - jobStatus.setState(connection.getJobStatus(take)); - publisher.publish(jobStatus); - // if the job is completed we do not have to put the job to the queue again - if (!jobStatus.getState().equals(JobState.COMPLETE)) { - take.setLastMonitored(new Timestamp((new Date()).getTime())); - this.queue.put(take); - } + if (take.getLastMonitored() == null || ((monitorDiff / 1000) >= 5)) { + GsisshHostType gsisshHostType = (GsisshHostType) take.getHost().getType(); + String hostName = gsisshHostType.getHostAddress(); + ResourceConnection connection = null; + if (connections.containsKey(hostName)) { + logger.debug("We already have this connection so not going to create one"); + connection = connections.get(hostName); + } else { + connection = new ResourceConnection(take, gsisshHostType.getInstalledPath()); + connections.put(hostName, connection); } + jobStatus.setMonitorID(take); + jobStatus.setState(connection.getJobStatus(take)); + publisher.publish(jobStatus); + // if the job is completed we do not have to put the job to the queue again + if (!jobStatus.getState().equals(JobState.COMPLETE)) { + take.setLastMonitored(new Timestamp((new Date()).getTime())); + this.queue.put(take); + } + } } catch (InterruptedException e) { if(!this.queue.contains(take)){ try { @@ -141,16 +147,31 @@ public class QstatMonitor extends PullMonitor implements Runnable { publisher.publish(jobStatus); }else if(e.getMessage().contains("illegally formed job identifier")){ logger.error("Wrong job ID is given so dropping the job from monitoring system"); - } - else if(!this.queue.contains(take)){ // we put the job back to the queue only if its state is not unknown - try { - this.queue.put(take); - } catch (InterruptedException e1) { - e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } else if (!this.queue.contains(take)) { // we put the job back to the queue only if its state is not unknown + if (take.getFailedCount() < 3) { + try { + take.setFailedCount(take.getFailedCount() + 1); + this.queue.put(take); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } else { + logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + take.getJobID()); } } logger.error("Error retrieving the job status"); throw new AiravataMonitorException("Error retrieving the job status", e); + } catch (Exception e){ + if (take.getFailedCount() < 3) { + try { + take.setFailedCount(take.getFailedCount() + 1); + this.queue.put(take); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } else { + logger.error("Tryied to monitor the job 3 times, so dropping of the the Job with ID: " + take.getJobID()); + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd ---------------------------------------------------------------------- diff --git a/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd b/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd index 45ac43b..27dbed1 100644 --- a/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd +++ b/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd @@ -109,6 +109,7 @@ <element name="exports" type="gfac:exportProperties" minOccurs="0" maxOccurs="1"/> <element name="preJobCommands" type="xsd:string" minOccurs="0" maxOccurs="unbounded"/> <element name="postJobCommands" type="xsd:string" minOccurs="0" maxOccurs="unbounded"/> + <element name="installedPath" type="xsd:string" minOccurs="0" maxOccurs="1" default="/opt/torque/bin"/> </sequence> </extension> </complexContent> http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/orchestrator/airavata-orchestrator-service/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/airavata-server.properties b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/airavata-server.properties index 1a3967f..f5ea35f 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/airavata-server.properties +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/airavata-server.properties @@ -100,7 +100,7 @@ gfac.embedded=true myproxy.server=myproxy.teragrid.org myproxy.user=ogce -myproxy.pass= +myproxy.pass=0Gce3098 myproxy.life=3600 # XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties index dc4ebdc..32b55a3 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties @@ -6,5 +6,5 @@ trusted.certificate.location=/Users/lahirugunathilake/Downloads/certificates certificate.path=/Users/lahirugunathilake/Downloads/certificates myproxy.server=myproxy.teragrid.org myproxy.user=ogce -myproxy.password= +myproxy.password=0Gce3098 myproxy.life=3600 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/orchestrator/airavata-orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java b/modules/orchestrator/airavata-orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java index a20c80e..4a1bc11 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java @@ -39,6 +39,7 @@ import org.apache.airavata.registry.cpi.CompositeIdentifier; import org.apache.airavata.registry.cpi.ParentDataType; import org.apache.airavata.registry.cpi.Registry; import org.apache.airavata.schemas.gfac.DataType; +import org.apache.thrift.TException; import org.junit.Before; import org.junit.Test; @@ -49,9 +50,9 @@ public class OrchestratorClientFactoryTest { private DocumentCreator documentCreator; private OrchestratorService.Client orchestratorClient; private Registry registry; - + private int NUM_CONCURRENT_REQUESTS = 1; @Before - public void setUp(){ + public void setUp() { orchestratorClient = OrchestratorClientFactory.createOrchestratorClient("localhost", 8940); registry = RegistryFactory.getDefaultRegistry(); AiravataUtils.setExecutionAsServer(); @@ -63,7 +64,6 @@ public class OrchestratorClientFactoryTest { private AiravataAPI getAiravataAPI() { AiravataAPI airavataAPI = null; - if (airavataAPI == null) { try { String systemUserName = ServerSettings.getSystemUser(); String gateway = ServerSettings.getSystemUserGateway(); @@ -73,46 +73,61 @@ public class OrchestratorClientFactoryTest { } catch (AiravataAPIInvocationException e) { e.printStackTrace(); } - } return airavataAPI; } - private void storeDescriptors(){ + private void storeDescriptors() { } @Test - public void storeExperimentDetail(){ - try{ - List<DataObjectType> exInputs = new ArrayList<DataObjectType>(); - DataObjectType input = new DataObjectType(); - input.setKey("echo_input"); - input.setType(DataType.STRING.toString()); - input.setValue("echo_output=Hello World"); - exInputs.add(input); - + public void storeExperimentDetail() { + for (int i = 0; i < NUM_CONCURRENT_REQUESTS; i++) { + Thread thread = new Thread() { + public void run() { + List<DataObjectType> exInputs = new ArrayList<DataObjectType>(); + DataObjectType input = new DataObjectType(); + input.setKey("echo_input"); + input.setType(DataType.STRING.toString()); + input.setValue("echo_output=Hello World"); + exInputs.add(input); - List<DataObjectType> exOut = new ArrayList<DataObjectType>(); - DataObjectType output = new DataObjectType(); - output.setKey("echo_output"); - output.setType(DataType.STRING.toString()); - output.setValue(""); - exOut.add(output); - Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("project1", "admin", "echoExperiment", "SimpleEcho2", "SimpleEcho2", exInputs); - simpleExperiment.setExperimentOutputs(exOut); + List<DataObjectType> exOut = new ArrayList<DataObjectType>(); + DataObjectType output = new DataObjectType(); + output.setKey("echo_output"); + output.setType(DataType.STRING.toString()); + output.setValue(""); + exOut.add(output); - ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling("trestles.sdsc.edu", 1, 1, 1, "normal", 0, 0, 1, "sds128"); - scheduling.setResourceHostId("gsissh-trestles"); - UserConfigurationData userConfigurationData = new UserConfigurationData(); - userConfigurationData.setComputationalResourceScheduling(scheduling); - simpleExperiment.setUserConfigurationData(userConfigurationData); - String expId = (String)registry.add(ParentDataType.EXPERIMENT, simpleExperiment); + Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("project1", "admin", "echoExperiment", "SimpleEcho2", "SimpleEcho2", exInputs); + simpleExperiment.setExperimentOutputs(exOut); - orchestratorClient.launchExperiment(expId); - } catch (Exception e) { - e.printStackTrace(); - } + ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling("trestles.sdsc.edu", 1, 1, 1, "normal", 0, 0, 1, "sds128"); + scheduling.setResourceHostId("gsissh-trestles"); + UserConfigurationData userConfigurationData = new UserConfigurationData(); + userConfigurationData.setComputationalResourceScheduling(scheduling); + simpleExperiment.setUserConfigurationData(userConfigurationData); + String expId = null; + try { + expId = (String) registry.add(ParentDataType.EXPERIMENT, simpleExperiment); + } catch (Exception e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + try { + orchestratorClient.launchExperiment(expId); + } catch (TException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + }; + thread.start(); + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } } }
