Author: lahiru
Date: Wed Sep 4 23:06:48 2013
New Revision: 1520166
URL: http://svn.apache.org/r1520166
Log:
adding listener to gsissh api.
Added:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/JobStatus.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
Removed:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAPI.java
Modified:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/SSHApi.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobType.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApi.java
airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java
Modified:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/SSHApi.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/SSHApi.java?rev=1520166&r1=1520165&r2=1520166&view=diff
==============================================================================
---
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/SSHApi.java
(original)
+++
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/SSHApi.java
Wed Sep 4 23:06:48 2013
@@ -28,6 +28,7 @@ package org.apache.airavata.gsi.ssh.api;
*/
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
/**
* An API to executed commands/jobs using GSI-SSH or SSH.
@@ -72,8 +73,34 @@ public interface SSHApi {
AuthenticationInfo authenticationInfo,
JobDescriptor jobDescriptor) throws
SSHApiException;
+ /**
+ *
+ * @param serverInfo
+ * @param authenticationInfo
+ * @return
+ * @throws SSHApiException
+ */
Cluster getCluster(ServerInfo serverInfo, AuthenticationInfo
authenticationInfo) throws SSHApiException;
+ /**
+ * @param serverInfo
+ * @param authenticationInfo
+ * @param jobID
+ * @return
+ * @throws SSHApiException
+ */
+ JobDescriptor getJobById(ServerInfo serverInfo, AuthenticationInfo
authenticationInfo, String jobID) throws SSHApiException;
- JobDescriptor getJobById(ServerInfo serverInfo,AuthenticationInfo
authenticationInfo, String jobID)throws SSHApiException;
+ /**
+ *
+ * @param serverInfo
+ * @param authenticationInfo
+ * @param jobDescriptor
+ * @param listener
+ * @return
+ * @throws SSHApiException
+ */
+ String submitAsyncJob(ServerInfo serverInfo,
+ AuthenticationInfo authenticationInfo,
+ JobDescriptor
jobDescriptor,JobSubmissionListener listener) throws SSHApiException;
}
Modified:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobType.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobType.java?rev=1520166&r1=1520165&r2=1520166&view=diff
==============================================================================
---
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobType.java
(original)
+++
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobType.java
Wed Sep 4 23:06:48 2013
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
package org.apache.airavata.gsi.ssh.api.job;
/**
Added:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java?rev=1520166&view=auto
==============================================================================
---
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
(added)
+++
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
Wed Sep 4 23:06:48 2013
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gsi.ssh.impl;
+
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
+
+public class DefaultJobSubmissionListener extends JobSubmissionListener {
+
+ public void statusChanged(JobDescriptor jobDescriptor) throws
SSHApiException {
+ System.out.println("Job status has changed : " +
jobDescriptor.getStatus());
+ }
+}
Modified:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApi.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApi.java?rev=1520166&r1=1520165&r2=1520166&view=diff
==============================================================================
---
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApi.java
(original)
+++
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApi.java
Wed Sep 4 23:06:48 2013
@@ -26,6 +26,7 @@ import org.apache.airavata.gsi.ssh.api.*
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.config.ConfigReader;
import org.apache.airavata.gsi.ssh.jsch.ExtendedJSch;
+import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.slf4j.Logger;
@@ -241,6 +242,13 @@ public class DefaultSSHApi implements SS
}
}
+ /**
+ *
+ * @param serverInfo
+ * @param authenticationInfo
+ * @return
+ * @throws SSHApiException
+ */
public Cluster getCluster(ServerInfo serverInfo, AuthenticationInfo
authenticationInfo) throws SSHApiException {
RawCommandInfo rawCommandInfo = new
RawCommandInfo("/opt/torque/bin/qnodes");
@@ -308,6 +316,14 @@ public class DefaultSSHApi implements SS
return c;
}
+ /**
+ *
+ * @param serverInfo
+ * @param authenticationInfo
+ * @param jobID
+ * @return
+ * @throws SSHApiException
+ */
public JobDescriptor getJobById(ServerInfo serverInfo, AuthenticationInfo
authenticationInfo, String jobID) throws SSHApiException {
RawCommandInfo rawCommandInfo = new
RawCommandInfo("/opt/torque/bin/qstat -f " + jobID);
@@ -330,9 +346,9 @@ public class DefaultSSHApi implements SS
}
if (line.length >= 2) {
header = line[0].trim();
- log.info("Header = " + header);
+ log.debug("Header = " + header);
value = line[1].trim();
- log.info("value = " + value);
+ log.debug("value = " + value);
if (header.equals("Variable_List")) {
while (info[i + 1].startsWith("\t")) {
@@ -403,6 +419,31 @@ public class DefaultSSHApi implements SS
}
}
return jobDescriptor;
+ }
+ /**
+ * @param serverInfo
+ * @param authenticationInfo
+ * @param jobDescriptor
+ * @param listener
+ * @return
+ * @throws SSHApiException
+ */
+ public String submitAsyncJob(ServerInfo serverInfo, AuthenticationInfo
authenticationInfo, JobDescriptor jobDescriptor, JobSubmissionListener
listener) throws SSHApiException {
+ String jobID = this.submitAsyncJob(serverInfo, authenticationInfo,
jobDescriptor);
+ while(listener.getJobStatus() != JobStatus.C) {
+ JobDescriptor jobById = this.getJobById(serverInfo,
authenticationInfo, jobID);
+ if
(!jobById.getStatus().equals(listener.getJobStatus().toString())) {
+
listener.setJobStatus(JobStatus.fromString(jobById.getStatus()));
+ listener.statusChanged(jobById);
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ log.error("Error during job status monitoring");
+ throw new SSHApiException("Error during job status
monitoring", e);
+ }
+ }
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
}
}
Added:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/JobStatus.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/JobStatus.java?rev=1520166&view=auto
==============================================================================
---
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/JobStatus.java
(added)
+++
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/JobStatus.java
Wed Sep 4 23:06:48 2013
@@ -0,0 +1,60 @@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+ package org.apache.airavata.gsi.ssh.impl;
+
+ /**
+ * This will contains all the PBS specific job statuses.
+ * C - Job is completed after having run/
+ * E - Job is exiting after having run.
+ * H - Job is held.
+ * Q - job is queued, eligible to run or routed.
+ * R - job is running.
+ * T - job is being moved to new location.
+ * W - job is waiting for its execution time
+ * (-a option) to be reached.
+ * S - (Unicos only) job is suspend.
+ */
+ public enum JobStatus {
+ C, E, H, Q, R, T, W, S,U;
+
+ public static JobStatus fromString(String status){
+ if(status != null){
+ if("C".equals(status)){
+ return JobStatus.C;
+ }else if("E".equals(status)){
+ return JobStatus.E;
+ }else if("H".equals(status)){
+ return JobStatus.H;
+ }else if("Q".equals(status)){
+ return JobStatus.Q;
+ }else if("R".equals(status)){
+ return JobStatus.R;
+ }else if("T".equals(status)){
+ return JobStatus.T;
+ }else if("W".equals(status)){
+ return JobStatus.W;
+ }else if("S".equals(status)){
+ return JobStatus.S;
+ }
+ }
+ return JobStatus.U;
+ }
+ }
Added:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java?rev=1520166&view=auto
==============================================================================
---
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
(added)
+++
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
Wed Sep 4 23:06:48 2013
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gsi.ssh.listener;
+
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+
+/**
+ * This interface can be implemented by the end user of the API
+ * to do desired operations based on the job status change. API has a
+ * default joblistener which can be used by the end users, but its
+ * configurable and can be parse to jobsubmission methods.
+ */
+public abstract class JobSubmissionListener {
+
+ private JobStatus jobStatus = JobStatus.U;
+
+ /**
+ * this will get called during job status change
+ *
+ * @param jobDescriptor
+ * @throws SSHApiException
+ */
+ public abstract void statusChanged(JobDescriptor jobDescriptor) throws
SSHApiException;
+
+ public JobStatus getJobStatus() {
+ return jobStatus;
+ }
+
+ public void setJobStatus(JobStatus jobStatus) {
+ this.jobStatus = jobStatus;
+ }
+}
Modified:
airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java
URL:
http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java?rev=1520166&r1=1520165&r2=1520166&view=diff
==============================================================================
---
airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java
(original)
+++
airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java
Wed Sep 4 23:06:48 2013
@@ -51,7 +51,7 @@ public class DefaultSSHApiTest {
public void setUp() throws Exception {
// System.setProperty("myproxy.user", "ogce");
// System.setProperty("myproxy.password", "");
-// System.setProperty("basedir",
"/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
+ System.setProperty("basedir",
"/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
myProxyUserName = System.getProperty("myproxy.user");
myProxyPassword = System.getProperty("myproxy.password");
@@ -188,6 +188,7 @@ public class DefaultSSHApiTest {
System.out.println("JobID returned : " + jobID);
// Cluster cluster = sshApi.getCluster(serverInfo, authenticationInfo);
+ Thread.sleep(1000);
JobDescriptor jobById = sshApi.getJobById(serverInfo,
authenticationInfo, jobID);
AssertJUnit.assertEquals(jobById.getJobId(), jobID);
System.out.println(jobById.getAcountString());
@@ -206,8 +207,45 @@ public class DefaultSSHApiTest {
System.out.println(jobById.getQTime());
System.out.println(jobById.getUsedCPUTime());
System.out.println(jobById.getUsedMemory());
+ System.out.println(jobById.getVariableList());
}
+
+ @Test
+ public void testGetJob() throws Exception {
+ // Create authentication
+ AuthenticationInfo authenticationInfo
+ = new MyProxyAuthenticationInfo(myProxyUserName,
myProxyPassword, "myproxy.teragrid.org",
+ 7512, 17280000);
+
+ // Server info
+ ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
+
+
+ // Get the API
+ SSHApi sshApi = SSHApiFactory.createSSHApi(this.certificateLocation);
+
+ // Execute command
+ String jobID = "1584791.trestles-fe1.sdsc.edu";
+ JobDescriptor jobById = sshApi.getJobById(serverInfo,
authenticationInfo, jobID);
+ System.out.println(jobById.getAcountString());
+ System.out.println(jobById.getAllEnvExport());
+ System.out.println(jobById.getCompTime());
+ System.out.println(jobById.getExecutablePath());
+ System.out.println(jobById.getEllapsedTime());
+ System.out.println(jobById.getQueueName());
+ System.out.println(jobById.getExecuteNode());
+ System.out.println(jobById.getJobName());
+ System.out.println(jobById.getCTime());
+ System.out.println(jobById.getSTime());
+ System.out.println(jobById.getMTime());
+ System.out.println(jobById.getCompTime());
+ System.out.println(jobById.getOwner());
+ System.out.println(jobById.getQTime());
+ System.out.println(jobById.getUsedCPUTime());
+ System.out.println(jobById.getUsedMemory());
+
+ }
@Test
public void testGetCluster()throws Exception{
// AuthenticationInfo authenticationInfo
@@ -269,4 +307,43 @@ public class DefaultSSHApiTest {
System.out.println(e.getMessage());
}
}
+
+@Test
+ public void testSubmitAsyncJobWithListener() throws Exception {
+ // Create authentication
+ AuthenticationInfo authenticationInfo
+ = new MyProxyAuthenticationInfo(myProxyUserName,
myProxyPassword, "myproxy.teragrid.org",
+ 7512, 17280000);
+
+ // Server info
+ ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
+
+
+ // Get the API
+ SSHApi sshApi = SSHApiFactory.createSSHApi(this.certificateLocation);
+
+ // Execute command
+ System.out.println("Target PBS file path: " + workingDirectory);
+ System.out.println("Local PBS File path: " + pbsFilePath);
+ String workingDirectory = File.separator + "home" + File.separator +
"ogce" + File.separator + "gsissh";
+ JobDescriptor jobDescriptor = new JobDescriptor();
+ jobDescriptor.setWorkingDirectory(workingDirectory);
+ jobDescriptor.setShellName("/bin/bash");
+ jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
+ jobDescriptor.setExecutablePath("/bin/sleep");
+ jobDescriptor.setAllEnvExport(true);
+ jobDescriptor.setMailOptions("n");
+ jobDescriptor.setStandardOutFile(workingDirectory + File.separator +
"application.out");
+ jobDescriptor.setStandardErrorFile(workingDirectory + File.separator +
"application.err");
+ jobDescriptor.setNodes(1);
+ jobDescriptor.setProcessesPerNode(1);
+ jobDescriptor.setMaxWallTime("1:00:00");
+ jobDescriptor.setAcountString("sds128");
+ List<String> inputs = new ArrayList<String>();
+ inputs.add("10000");
+ jobDescriptor.setInputValues(inputs);
+ System.out.println(jobDescriptor.toXML());
+ sshApi.submitAsyncJob(serverInfo, authenticationInfo,
jobDescriptor,new DefaultJobSubmissionListener());
+ }
+
}