Author: samindaw
Date: Mon Jun 10 23:21:50 2013
New Revision: 1491638
URL: http://svn.apache.org/r1491638
Log:
application job data persistance for SSHProvider
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java?rev=1491638&r1=1491637&r2=1491638&view=diff
==============================================================================
---
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
(original)
+++
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
Mon Jun 10 23:21:50 2013
@@ -25,6 +25,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -48,6 +49,8 @@ import org.apache.airavata.gfac.context.
import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.registry.api.workflow.ApplicationJob;
+import
org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.NameValuePairType;
import org.apache.airavata.schemas.gfac.URIArrayType;
@@ -60,21 +63,36 @@ import org.slf4j.LoggerFactory;
public class SSHProvider implements GFacProvider {
private static final Logger log =
LoggerFactory.getLogger(SSHProvider.class);
private SSHSecurityContext securityContext;
+ private String jobID=null;
public void initialize(JobExecutionContext jobExecutionContext) throws
GFacProviderException,GFacException {
+
jobID="SSH_"+jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress()+"_"+Calendar.getInstance().getTimeInMillis();
+
securityContext = (SSHSecurityContext)
jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
ApplicationDeploymentDescriptionType app =
jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
String remoteFile = app.getStaticWorkingDirectory() +
File.separatorChar + Constants.EXECUTABLE_NAME;
+ saveApplicationJob(jobExecutionContext, remoteFile);
log.info(remoteFile);
try {
File runscript = createShellScript(jobExecutionContext);
SCPFileTransfer fileTransfer =
securityContext.getSSHClient().newSCPFileTransfer();
+
GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID,
ApplicationJobStatus.STAGING);
fileTransfer.upload(runscript.getAbsolutePath(),
remoteFile);
} catch (IOException e) {
throw new
GFacProviderException(e.getLocalizedMessage(), e);
}
}
+ private void saveApplicationJob(JobExecutionContext
jobExecutionContext, String executableName) {
+ ApplicationJob job =
GFacUtils.createApplicationJob(jobExecutionContext);
+ job.setJobId(jobID);
+ job.setJobStatus(ApplicationJobStatus.INITIALIZE);
+ job.setSubmittedTime(Calendar.getInstance().getTime());
+ job.setStatusUpdateTime(job.getSubmittedTime());
+ job.setJobData(executableName);
+ GFacUtils.recordApplicationJob(jobExecutionContext, job);
+ }
+
public void execute(JobExecutionContext jobExecutionContext) throws
GFacProviderException {
ApplicationDeploymentDescriptionType app =
jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
Session session = null;
@@ -84,7 +102,9 @@ public class SSHProvider implements GFac
* Execute
*/
String execuable = app.getStaticWorkingDirectory() +
File.separatorChar + Constants.EXECUTABLE_NAME;
+
GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID,
ApplicationJobStatus.SUBMITTED);
Command cmd = session.exec("/bin/chmod 755 " +
execuable + "; " + execuable);
+
GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID,
ApplicationJobStatus.RESULTS_RETRIEVE);
log.info("stdout=" +
GFacUtils.readFromStream(session.getInputStream()));
cmd.join(Constants.COMMAND_EXECUTION_TIMEOUT,
TimeUnit.SECONDS);
@@ -98,7 +118,8 @@ public class SSHProvider implements GFac
} else {
log.info("Process finished with return value of
zero.");
}
-
+
+
GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID,
ApplicationJobStatus.FINISHED);
} catch (ConnectionException e) {
throw new GFacProviderException(e.getMessage(), e);
} catch (TransportException e) {