Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f2b5df44 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f2b5df44 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f2b5df44 Branch: refs/heads/master Commit: f2b5df44482f3cf8912a0582bc0c5696c8839243 Parents: 5381d59 aed31d3 Author: lahiru <[email protected]> Authored: Tue Apr 22 16:41:57 2014 -0400 Committer: lahiru <[email protected]> Committed: Tue Apr 22 16:41:57 2014 -0400 ---------------------------------------------------------------------- .../server/handler/AiravataServerHandler.java | 17 ++- .../api/client/AiravataClientFactory.java | 6 +- .../error/AiravataClientConnectException.java | 32 ++++++ .../workspace/experiment/ExperimentState.java | 17 +-- .../model/workspace/experiment/JobState.java | 21 ++-- .../model/workspace/experiment/TaskState.java | 17 +-- .../workspace/experiment/TransferState.java | 21 ++-- .../workspace/experiment/WorkflowNodeState.java | 17 +-- .../experimentModel.thrift | 5 + .../main/resources/airavata-server.properties | 2 +- .../java/src/main/assembly/bin-assembly.xml | 2 +- modules/gfac/gfac-core/pom.xml | 7 +- .../org/apache/airavata/gfac/Scheduler.java | 2 - .../org/apache/airavata/gfac/cpi/GFacImpl.java | 66 +++++++++-- .../airavata/gfac/provider/GFacProvider.java | 12 +- .../gfac/provider/impl/AbstractProvider.java | 51 ++++++++- .../gfac/provider/impl/BESProvider.java | 2 +- .../gfac/provider/impl/GSISSHProvider.java | 17 +-- .../gfac/provider/impl/GramProvider.java | 3 +- .../gfac/provider/impl/HadoopProvider.java | 4 +- .../gfac/provider/impl/LocalProvider.java | 9 +- modules/gfac/gfac-ec2/pom.xml | 6 +- .../apache/airavata/gfac/ec2/EC2Provider.java | 54 ++++----- .../job/monitor/AbstractActivityListener.java | 27 +++++ .../monitor/AbstractActivityMonitorClient.java | 27 ----- .../AiravataExperimentStatusUpdator.java | 82 +++++++++++++ .../job/monitor/AiravataJobStatusUpdator.java | 50 +++++++- .../job/monitor/AiravataTaskStatusUpdator.java | 114 +++++++++++++++++++ .../AiravataWorkflowNodeStatusUpdator.java | 86 ++++++++++++++ .../airavata/job/monitor/MonitorManager.java | 56 +++++---- .../command/ExperimentCancelRequest.java | 38 +++++++ .../job/monitor/command/TaskCancelRequest.java | 52 +++++++++ .../QstatMonitorTestWithMyProxyAuth.java | 4 +- .../gfac/provider/impl/SSHProvider.java | 3 +- .../airavata/integration/SimpleEchoIT.java | 12 +- .../SingleAppIntegrationTestBase.java | 71 +++++------- .../WorkflowIntegrationTestBase.java | 29 +++-- .../orchestrator/server/OrchestratorServer.java | 3 - .../server/OrchestratorServerHandler.java | 8 +- .../core/context/OrchestratorContext.java | 18 ++- .../core/impl/EmbeddedGFACJobSubmitter.java | 9 +- .../airavata/orchestrator/cpi/Orchestrator.java | 10 +- .../cpi/impl/AbstractOrchestrator.java | 2 - .../cpi/impl/SimpleOrchestratorImpl.java | 37 ++++-- .../registry/jpa/impl/ExperimentRegistry.java | 10 +- .../jpa/resources/TaskDetailResource.java | 3 + .../airavata/api/samples/ExperimentSample.java | 5 +- .../apache/airavata/job/monitor/MonitorID.java | 28 +++-- .../job/monitor/core/MessageParser.java | 2 +- .../job/monitor/event/MonitorPublisher.java | 20 ++-- .../job/monitor/impl/LocalJobMonitor.java | 4 +- .../monitor/impl/pull/qstat/QstatMonitor.java | 4 +- .../job/monitor/impl/push/amqp/AMQPMonitor.java | 4 +- .../monitor/impl/push/amqp/BasicConsumer.java | 2 +- .../impl/push/amqp/JSONMessageParser.java | 2 +- .../impl/push/amqp/UnRegisterWorker.java | 4 +- .../state/AbstractStateChangeRequest.java | 37 ++++++ .../state/ExperimentStatusChangeRequest.java | 55 +++++++++ .../airavata/job/monitor/state/JobStatus.java | 67 ----------- .../monitor/state/JobStatusChangeRequest.java | 56 +++++++++ .../job/monitor/state/PublisherMessage.java | 26 +++++ .../monitor/state/TaskStatusChangeRequest.java | 53 +++++++++ .../apache/airavata/job/AMQPMonitorTest.java | 4 +- .../job/QstatMonitorTestWithMyProxyAuth.java | 6 +- 64 files changed, 1142 insertions(+), 378 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/f2b5df44/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java index 5914dd3,0000000..12e2ad1 mode 100644,000000..100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java @@@ -1,262 -1,0 +1,261 @@@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.provider.impl; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; + +import org.apache.airavata.commons.gfac.type.ActualParameter; +import org.apache.airavata.commons.gfac.type.MappingFactory; +import org.apache.airavata.gfac.Constants; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.context.JobExecutionContext; +import org.apache.airavata.gfac.context.MessageContext; +import org.apache.airavata.gfac.context.security.SSHSecurityContext; - import org.apache.airavata.gfac.provider.GFacProvider; +import org.apache.airavata.gfac.provider.GFacProviderException; +import org.apache.airavata.gfac.utils.GFacUtils; +import org.apache.airavata.gsi.ssh.api.Cluster; +import org.apache.airavata.gsi.ssh.api.CommandExecutor; +import org.apache.airavata.gsi.ssh.api.SSHApiException; +import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; +import org.apache.airavata.gsi.ssh.impl.RawCommandInfo; +import org.apache.airavata.gsi.ssh.impl.StandardOutReader; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; +import org.apache.airavata.schemas.gfac.NameValuePairType; +import org.apache.airavata.schemas.gfac.SSHHostType; +import org.apache.airavata.schemas.gfac.URIArrayType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +/** + * Execute application using remote SSH + */ - public class SSHProvider extends AbstractProvider implements GFacProvider{ ++public class SSHProvider extends AbstractProvider{ + private static final Logger log = LoggerFactory.getLogger(SSHProvider.class); + private Cluster cluster; + private String jobID = null; + private String taskID = null; + // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh + private GSISSHProvider gsiSshProvider = null; + + public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + super.initialize(jobExecutionContext); + taskID = jobExecutionContext.getTaskData().getTaskID(); + if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) { + jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis(); + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); + + ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); + String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME; + details.setJobID(taskID); + details.setJobDescription(remoteFile); + jobExecutionContext.setJobDetails(details); + JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(jobExecutionContext, app, null); + details.setJobDescription(jobDescriptor.toXML()); + + GFacUtils.saveJobStatus(details, JobState.SETUP, taskID); + log.info(remoteFile); + try { + File runscript = createShellScript(jobExecutionContext); + cluster.scpTo(remoteFile, runscript.getAbsolutePath()); + } catch (Exception e) { + throw new GFacProviderException(e.getLocalizedMessage(), e); + } + }else{ + gsiSshProvider = new GSISSHProvider(); + } + } + + + public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException { + if (gsiSshProvider == null) { + ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); + try { + /* + * Execute + */ + String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME; + details.setJobDescription(execuable); + +// GFacUtils.updateJobStatus(details, JobState.SUBMITTED); + RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable); + + StandardOutReader jobIDReaderCommandOutput = new StandardOutReader(); + + CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput); + String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource"); + + log.info("stdout=" + stdOutputString); + +// GFacUtils.updateJobStatus(details, JobState.COMPLETE); + } catch (Exception e) { + throw new GFacProviderException(e.getMessage(), e); + } finally { + if (cluster != null) { + try { + cluster.disconnect(); + } catch (SSHApiException e) { + throw new GFacProviderException(e.getMessage(), e); + } + } + } + } else { + try { + gsiSshProvider.execute(jobExecutionContext); + } catch (GFacException e) { + throw new GFacProviderException(e.getMessage(), e); + } + } + } + + public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException { + if (gsiSshProvider != null){ + try { + gsiSshProvider.dispose(jobExecutionContext); + } catch (GFacException e) { + throw new GFacProviderException(e.getMessage(),e); + } + } + } + + + public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException { + throw new NotImplementedException(); + } + + + private File createShellScript(JobExecutionContext context) throws IOException { + ApplicationDeploymentDescriptionType app = context.getApplicationContext() + .getApplicationDeploymentDescription().getType(); + String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis() + + new Random().nextLong(); + + File shellScript = File.createTempFile(uniqueDir, "sh"); + OutputStream out = new FileOutputStream(shellScript); + + out.write("#!/bin/bash\n".getBytes()); + out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes()); + out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes()); + out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n") + .getBytes()); + // get the env of the host and the application + NameValuePairType[] env = app.getApplicationEnvironmentArray(); + + Map<String, String> nv = new HashMap<String, String>(); + if (env != null) { + for (int i = 0; i < env.length; i++) { + String key = env[i].getName(); + String value = env[i].getValue(); + nv.put(key, value); + } + } + for (Entry<String, String> entry : nv.entrySet()) { + log.debug("Env[" + entry.getKey() + "] = " + entry.getValue()); + out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes()); + + } + + // prepare the command + final String SPACE = " "; + StringBuffer cmd = new StringBuffer(); + cmd.append(app.getExecutableLocation()); + cmd.append(SPACE); + + MessageContext input = context.getInMessageContext(); + ; + Map<String, Object> inputs = input.getParameters(); + Set<String> keys = inputs.keySet(); + for (String paramName : keys) { + ActualParameter actualParameter = (ActualParameter) inputs.get(paramName); + if ("URIArray".equals(actualParameter.getType().getType().toString())) { + String[] values = ((URIArrayType) actualParameter.getType()).getValueArray(); + for (String value : values) { + cmd.append(value); + cmd.append(SPACE); + } + } else { + String paramValue = MappingFactory.toString(actualParameter); + cmd.append(paramValue); + cmd.append(SPACE); + } + } + // We redirect the error and stdout to remote files, they will be read + // in later + cmd.append(SPACE); + cmd.append("1>"); + cmd.append(SPACE); + cmd.append(app.getStandardOutput()); + cmd.append(SPACE); + cmd.append("2>"); + cmd.append(SPACE); + cmd.append(app.getStandardError()); + + String cmdStr = cmd.toString(); + log.info("Command = " + cmdStr); + out.write((cmdStr + "\n").getBytes()); + String message = "\"execuationSuceeded\""; + out.write(("echo " + message + "\n").getBytes()); + out.close(); + + return shellScript; + } + + public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException { + if (gsiSshProvider != null){ + try { + initProperties(properties); + } catch (GFacException e) { + throw new GFacProviderException(e.getMessage(),e); + } + } + } + /** + * This method will read standard output and if there's any it will be parsed + * @param jobIDReaderCommandOutput + * @param errorMsg + * @return + * @throws SSHApiException + */ + private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException { + String stdOutputString = jobIDReaderCommandOutput.getStdOutputString(); + String stdErrorString = jobIDReaderCommandOutput.getStdErrorString(); + + if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){ + log.error("Standard Error output : " + stdErrorString); + throw new SSHApiException(errorMsg + stdErrorString); + } + return stdOutputString; + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/f2b5df44/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/f2b5df44/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java ----------------------------------------------------------------------
