This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch helix-integration in repository https://gitbox.apache.org/repos/asf/airavata.git
commit b199bc2090ff9bc8b9c5827adeedcbfbfe181cc0 Author: dimuthu <[email protected]> AuthorDate: Mon Feb 26 01:11:29 2018 -0500 Stabalizing DefaultJobSubmission Task --- .../airavata-helix/agent-impl/ssh-agent/pom.xml | 21 ++-- .../airavata/helix/agent/ssh/SshAgentAdaptor.java | 132 ++++++++++++--------- .../helix/agent/ssh/StandardOutReader.java | 80 ++++--------- .../helix/task/api/support/AdaptorSupport.java | 34 +----- modules/airavata-helix/task-core/pom.xml | 5 + .../helix/core/support/AdaptorSupportImpl.java | 19 +-- .../airavata/helix/impl/task/EnvSetupTask.java | 4 +- .../airavata/helix/impl/task/TaskContext.java | 9 +- .../impl/task/submission/GroovyMapBuilder.java | 74 +++++++++++- .../helix/impl/task/submission/GroovyMapData.java | 10 +- .../submission/task/DefaultJobSubmissionTask.java | 8 +- .../submission/task/ForkJobSubmissionTask.java | 4 +- .../task/submission/task/JobSubmissionTask.java | 13 +- .../submission/task/LocalJobSubmissionTask.java | 6 +- .../src/main/resources/airavata-server.properties | 4 +- .../src/main/resources/log4j.properties | 2 + 16 files changed, 242 insertions(+), 183 deletions(-) diff --git a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml index 44cf919..bc78971 100644 --- a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml +++ b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml @@ -3,9 +3,10 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <artifactId>agent-impl</artifactId> - <groupId>org.apache</groupId> - <version>1.0-SNAPSHOT</version> + <artifactId>airavata-helix</artifactId> + <groupId>org.apache.airavata</groupId> + <version>0.17-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> </parent> <modelVersion>4.0.0</modelVersion> @@ -33,19 +34,15 @@ <artifactId>airavata-credential-store</artifactId> <version>0.17-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>agent-api</artifactId> + <version>0.17-SNAPSHOT</version> + </dependency> </dependencies> <build> <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.5.1</version> - <configuration> - <source>${java.version}</source> - <target>${java.version}</target> - </configuration> - </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java index 19b429c..ef8d580 100644 --- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java +++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java @@ -13,7 +13,6 @@ import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.registry.cpi.AppCatalogException; -import org.apache.airavata.registry.cpi.ComputeResource; import java.io.*; import java.util.Arrays; @@ -130,47 +129,64 @@ public class SshAgentAdaptor implements AgentAdaptor { public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException { StandardOutReader commandOutput = new StandardOutReader(); + ChannelExec channelExec = null; try { - ChannelExec channelExec = ((ChannelExec) session.openChannel("exec")); + channelExec = ((ChannelExec) session.openChannel("exec")); channelExec.setCommand(command); channelExec.setInputStream(null); - channelExec.setErrStream(commandOutput.getStandardError()); + InputStream out = channelExec.getInputStream(); + InputStream err = channelExec.getErrStream(); channelExec.connect(); - commandOutput.onOutput(channelExec); + + commandOutput.setExitCode(channelExec.getExitStatus()); + commandOutput.readStdOutFromStream(out); + commandOutput.readStdErrFromStream(err); return commandOutput; } catch (JSchException e) { + e.printStackTrace(); + throw new AgentException(e); + } catch (IOException e) { + e.printStackTrace(); throw new AgentException(e); + } finally { + if (channelExec != null) { + channelExec.disconnect(); + } } } public void createDirectory(String path) throws AgentException { + String command = "mkdir -p " + path; + ChannelExec channelExec = null; try { - String command = "mkdir -p " + path; - Channel channel = session.openChannel("exec"); + channelExec = (ChannelExec)session.openChannel("exec"); StandardOutReader stdOutReader = new StandardOutReader(); - ((ChannelExec) channel).setCommand(command); + channelExec.setCommand(command); + InputStream out = channelExec.getInputStream(); + InputStream err = channelExec.getErrStream(); + channelExec.connect(); + + stdOutReader.readStdOutFromStream(out); + stdOutReader.readStdErrFromStream(err); - ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); - try { - channel.connect(); - } catch (JSchException e) { - channel.disconnect(); - System.out.println("Unable to retrieve command output. Command - " + command + - " on server - " + session.getHost() + ":" + session.getPort() + - " connecting user name - " - + session.getUserName()); - throw new AgentException(e); - } - stdOutReader.onOutput(channel); - if (stdOutReader.getStdErrorString().contains("mkdir:")) { - throw new AgentException(stdOutReader.getStdErrorString()); + if (stdOutReader.getStdError() != null && stdOutReader.getStdError().contains("mkdir:")) { + throw new AgentException(stdOutReader.getStdError()); } - - channel.disconnect(); } catch (JSchException e) { + System.out.println("Unable to retrieve command output. Command - " + command + + " on server - " + session.getHost() + ":" + session.getPort() + + " connecting user name - " + + session.getUserName()); + throw new AgentException(e); + } catch (IOException e) { + e.printStackTrace(); throw new AgentException(e); + } finally { + if (channelExec != null) { + channelExec.disconnect(); + } } } @@ -182,20 +198,22 @@ public class SshAgentAdaptor implements AgentAdaptor { } boolean ptimestamp = true; + ChannelExec channelExec = null; try { // exec 'scp -t rfile' remotely String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile; - Channel channel = session.openChannel("exec"); + channelExec = (ChannelExec)session.openChannel("exec"); StandardOutReader stdOutReader = new StandardOutReader(); - ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); - ((ChannelExec) channel).setCommand(command); + //channelExec.setErrStream(stdOutReader.getStandardError()); + channelExec.setCommand(command); // get I/O streams for remote scp - OutputStream out = channel.getOutputStream(); - InputStream in = channel.getInputStream(); + OutputStream out = channelExec.getOutputStream(); + InputStream in = channelExec.getInputStream(); + InputStream err = channelExec.getErrStream(); - channel.connect(); + channelExec.connect(); if (checkAck(in) != 0) { String error = "Error Reading input Stream"; @@ -255,12 +273,10 @@ public class SshAgentAdaptor implements AgentAdaptor { throw new AgentException(error); } out.close(); - stdOutReader.onOutput(channel); - + stdOutReader.readStdErrFromStream(err); - channel.disconnect(); - if (stdOutReader.getStdErrorString().contains("scp:")) { - throw new AgentException(stdOutReader.getStdErrorString()); + if (stdOutReader.getStdError().contains("scp:")) { + throw new AgentException(stdOutReader.getStdError()); } //since remote file is always a file we just return the file //return remoteFile; @@ -273,43 +289,47 @@ public class SshAgentAdaptor implements AgentAdaptor { } catch (IOException e) { e.printStackTrace(); throw new AgentException(e); + } finally { + if (channelExec != null) { + channelExec.disconnect(); + } } } @Override public List<String> listDirectory(String path) throws AgentException { - + String command = "ls " + path; + ChannelExec channelExec = null; try { - String command = "ls " + path; - Channel channel = session.openChannel("exec"); + channelExec = (ChannelExec)session.openChannel("exec"); StandardOutReader stdOutReader = new StandardOutReader(); - ((ChannelExec) channel).setCommand(command); + channelExec.setCommand(command); + InputStream out = channelExec.getInputStream(); + InputStream err = channelExec.getErrStream(); - ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); - try { - channel.connect(); - } catch (JSchException e) { - - channel.disconnect(); -// session.disconnect(); + channelExec.connect(); - throw new AgentException("Unable to retrieve command output. Command - " + command + - " on server - " + session.getHost() + ":" + session.getPort() + - " connecting user name - " - + session.getUserName(), e); - } - stdOutReader.onOutput(channel); - stdOutReader.getStdOutputString(); - if (stdOutReader.getStdErrorString().contains("ls:")) { - throw new AgentException(stdOutReader.getStdErrorString()); + stdOutReader.readStdOutFromStream(out); + stdOutReader.readStdErrFromStream(err); + if (stdOutReader.getStdError().contains("ls:")) { + throw new AgentException(stdOutReader.getStdError()); } - channel.disconnect(); - return Arrays.asList(stdOutReader.getStdOutputString().split("\n")); + return Arrays.asList(stdOutReader.getStdOut().split("\n")); } catch (JSchException e) { + throw new AgentException("Unable to retrieve command output. Command - " + command + + " on server - " + session.getHost() + ":" + session.getPort() + + " connecting user name - " + + session.getUserName(), e); + } catch (IOException e) { + e.printStackTrace(); throw new AgentException(e); + } finally { + if (channelExec != null) { + channelExec.disconnect(); + } } } diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java index 49c036e..94ba566 100644 --- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java +++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java @@ -2,11 +2,9 @@ package org.apache.airavata.helix.agent.ssh; import com.jcraft.jsch.Channel; import org.apache.airavata.agents.api.CommandOutput; +import org.apache.commons.io.IOUtils; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.io.*; /** * TODO: Class level comments please @@ -16,68 +14,38 @@ import java.io.OutputStream; */ public class StandardOutReader implements CommandOutput { - // Todo improve this. We need to direct access of std out and exit code + private String stdOut; + private String stdError; + private Integer exitCode; - String stdOutputString = null; - ByteArrayOutputStream errorStream = new ByteArrayOutputStream(); - private int exitCode; - - public void onOutput(Channel channel) { - try { - StringBuffer pbsOutput = new StringBuffer(""); - InputStream inputStream = channel.getInputStream(); - byte[] tmp = new byte[1024]; - do { - while (inputStream.available() > 0) { - int i = inputStream.read(tmp, 0, 1024); - if (i < 0) break; - pbsOutput.append(new String(tmp, 0, i)); - } - } while (!channel.isClosed()) ; - String output = pbsOutput.toString(); - this.setStdOutputString(output); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public void exitCode(int code) { - System.out.println("Program exit code - " + code); - this.exitCode = code; - } - - public int getExitCode() { - return exitCode; - } - - public String getStdOutputString() { - return stdOutputString; - } - - public void setStdOutputString(String stdOutputString) { - this.stdOutputString = stdOutputString; + @Override + public String getStdOut() { + return this.stdOut; } - public String getStdErrorString() { - return errorStream.toString(); + @Override + public String getStdError() { + return this.stdError; } - public OutputStream getStandardError() { - return errorStream; + @Override + public Integer getExitCode() { + return this.exitCode; } - @Override - public String getStdOut() { - return null; + public void readStdOutFromStream(InputStream is) throws IOException { + StringWriter writer = new StringWriter(); + IOUtils.copy(is, writer, "UTF-8"); + this.stdOut = writer.toString(); } - @Override - public String getStdError() { - return null; + public void readStdErrFromStream(InputStream is) throws IOException { + StringWriter writer = new StringWriter(); + IOUtils.copy(is, writer, "UTF-8"); + this.stdError = writer.toString(); } - @Override - public String getExitCommand() { - return null; + public void setExitCode(Integer exitCode) { + this.exitCode = exitCode; } } diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java index 3e24aaa..4b6e11e 100644 --- a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java +++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java @@ -15,38 +15,6 @@ import java.io.File; public interface AdaptorSupport { public void initializeAdaptor(); - public AgentAdaptor fetchAdaptor(String computeResource, String protocol, String authToken) throws Exception; + public AgentAdaptor fetchAdaptor(String gatewayId, String computeResource, String protocol, String authToken, String userId) throws Exception; - - /** - * - * @param command - * @param workingDirectory - * @param computeResourceId - * @param protocol - * @param authToken - * @throws Exception - */ - public CommandOutput executeCommand(String command, String workingDirectory, String computeResourceId, String protocol, String authToken) throws Exception; - - /** - * - * @param path - * @param computeResourceId - * @param protocol - * @param authToken - * @throws Exception - */ - public void createDirectory(String path, String computeResourceId, String protocol, String authToken) throws Exception; - - /** - * - * @param sourceFile - * @param destinationFile - * @param computeResourceId - * @param protocol - * @param authToken - * @throws Exception - */ - public void copyFile(String sourceFile, String destinationFile, String computeResourceId, String protocol, String authToken) throws Exception; } diff --git a/modules/airavata-helix/task-core/pom.xml b/modules/airavata-helix/task-core/pom.xml index df72dac..bf860f8 100644 --- a/modules/airavata-helix/task-core/pom.xml +++ b/modules/airavata-helix/task-core/pom.xml @@ -28,6 +28,11 @@ <artifactId>agent-api</artifactId> <version>0.17-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>ssh-agent</artifactId> + <version>0.17-SNAPSHOT</version> + </dependency> </dependencies> <!--<build> diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java index 87a1e17..a98b8f0 100644 --- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java +++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java @@ -1,6 +1,7 @@ package org.apache.airavata.helix.core.support; import org.apache.airavata.agents.api.*; +import org.apache.airavata.helix.agent.ssh.SshAgentAdaptor; import org.apache.airavata.helix.task.api.support.AdaptorSupport; import java.io.File; @@ -29,19 +30,9 @@ public class AdaptorSupportImpl implements AdaptorSupport { public void initializeAdaptor() { } - public CommandOutput executeCommand(String command, String workingDirectory, String computeResourceId, String protocol, String authToken) throws AgentException { - return fetchAdaptor(computeResourceId, protocol, authToken).executeCommand(command, workingDirectory); - } - - public void createDirectory(String path, String computeResourceId, String protocol, String authToken) throws AgentException { - fetchAdaptor(computeResourceId, protocol, authToken).createDirectory(path); - } - - public void copyFile(String sourceFile, String destinationFile, String computeResourceId, String protocol, String authToken) throws AgentException { - fetchAdaptor(computeResourceId, protocol, authToken).copyFile(sourceFile, destinationFile); - } - - public AgentAdaptor fetchAdaptor(String computeResource, String protocol, String authToken) throws AgentException { - return agentStore.fetchAdaptor(computeResource, protocol, authToken); + public AgentAdaptor fetchAdaptor(String gatewayId, String computeResource, String protocol, String authToken, String userId) throws AgentException { + SshAgentAdaptor agentAdaptor = new SshAgentAdaptor(); + agentAdaptor.init(computeResource, gatewayId, userId, authToken); + return agentAdaptor; } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java index f079b9f..cabc014 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java @@ -24,9 +24,11 @@ public class EnvSetupTask extends AiravataTask { try { publishTaskState(TaskState.EXECUTING); AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( + getTaskContext().getGatewayId(), getTaskContext().getComputeResourceId(), getTaskContext().getJobSubmissionProtocol().name(), - getTaskContext().getComputeResourceCredentialToken()); + getTaskContext().getComputeResourceCredentialToken(), + getTaskContext().getComputeResourceLoginUserName()); adaptor.createDirectory(workingDirectory); publishTaskState(TaskState.COMPLETED); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java index 7de738e..f33d8a1 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java @@ -85,7 +85,7 @@ public class TaskContext { * Note: process context property use lazy loading approach. In runtime you will see some properties as null * unless you have access it previously. Once that property access using the api,it will be set to correct value. */ - private TaskContext(String taskId, String processId, String gatewayId) { + private TaskContext(String processId, String gatewayId, String taskId) { this.processId = processId; this.gatewayId = gatewayId; this.taskId = taskId; @@ -784,7 +784,12 @@ public class TaskContext { ctx.setGatewayResourceProfile(gatewayResourceProfile); ctx.setGatewayComputeResourcePreference(gatewayComputeResourcePreference); ctx.setGatewayStorageResourcePreference(gatewayStorageResourcePreference); - + ctx.setApplicationDeploymentDescription(appCatalog.getApplicationDeployment() + .getApplicationDeployement(processModel.getApplicationDeploymentId())); + ctx.setApplicationInterfaceDescription(appCatalog.getApplicationInterface() + .getApplicationInterface(processModel.getApplicationInterfaceId())); + ctx.setComputeResourceDescription(appCatalog.getComputeResource().getComputeResource + (ctx.getComputeResourceId())); return ctx; } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java index 0b92922..16e8114 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java @@ -2,17 +2,21 @@ package org.apache.airavata.helix.impl.task.submission; import groovy.text.GStringTemplateEngine; import groovy.text.TemplateEngine; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.helix.impl.task.TaskContext; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appdeployment.CommandObject; import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; -import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; +import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.model.application.io.DataType; import org.apache.airavata.model.application.io.InputDataObjectType; import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.parallelism.ApplicationParallelismType; +import org.apache.airavata.model.process.ProcessModel; import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; import org.apache.airavata.model.task.JobSubmissionTaskModel; +import org.apache.airavata.registry.cpi.AppCatalogException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.thrift.TException; @@ -38,6 +42,8 @@ public class GroovyMapBuilder { public GroovyMapData build() throws Exception { GroovyMapData mapData = new GroovyMapData(); + + setMailAddresses(taskContext, mapData); mapData.setInputDir(taskContext.getInputDir()); mapData.setOutputDir(taskContext.getOutputDir()); mapData.setExecutablePath(taskContext.getApplicationDeploymentDescription().getExecutablePath()); @@ -51,6 +57,7 @@ public class GroovyMapBuilder { mapData.setAccountString(taskContext.getAllocationProjectNumber()); mapData.setReservation(taskContext.getReservation()); mapData.setJobName("A" + String.valueOf(generateJobName())); + mapData.setWorkingDirectory(taskContext.getWorkingDir()); List<String> inputValues = getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), true); inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), true)); @@ -332,4 +339,69 @@ public class GroovyMapBuilder { } } + private static void setMailAddresses(TaskContext taskContext, GroovyMapData groovyMap) throws AppCatalogException, + ApplicationSettingsException { + + ProcessModel processModel = taskContext.getProcessModel(); + String emailIds = null; + if (isEmailBasedJobMonitor(taskContext)) { + emailIds = ServerSettings.getEmailBasedMonitorAddress(); + } + if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) { + String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS); + if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) { + if (emailIds != null && !emailIds.isEmpty()) { + emailIds += ("," + userJobNotifEmailIds); + } else { + emailIds = userJobNotifEmailIds; + } + } + if (processModel.isEnableEmailNotification()) { + List<String> emailList = processModel.getEmailAddresses(); + String elist = listToCsv(emailList, ','); + if (elist != null && !elist.isEmpty()) { + if (emailIds != null && !emailIds.isEmpty()) { + emailIds = emailIds + "," + elist; + } else { + emailIds = elist; + } + } + } + } + if (emailIds != null && !emailIds.isEmpty()) { + logger.info("Email list: " + emailIds); + groovyMap.setMailAddress(emailIds); + } + } + + public static boolean isEmailBasedJobMonitor(TaskContext taskContext) throws AppCatalogException { + JobSubmissionProtocol jobSubmissionProtocol = taskContext.getPreferredJobSubmissionProtocol(); + JobSubmissionInterface jobSubmissionInterface = taskContext.getPreferredJobSubmissionInterface(); + if (jobSubmissionProtocol == JobSubmissionProtocol.SSH) { + String jobSubmissionInterfaceId = jobSubmissionInterface.getJobSubmissionInterfaceId(); + SSHJobSubmission sshJobSubmission = taskContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId); + MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); + return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR; + } else { + return false; + } + } + + public static String listToCsv(List<String> listOfStrings, char separator) { + StringBuilder sb = new StringBuilder(); + + // all but last + for (int i = 0; i < listOfStrings.size() - 1; i++) { + sb.append(listOfStrings.get(i)); + sb.append(separator); + } + + // last string, no separator + if (listOfStrings.size() > 0) { + sb.append(listOfStrings.get(listOfStrings.size() - 1)); + } + + return sb.toString(); + } + } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java index 995f772..6ebde21 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java @@ -1,10 +1,11 @@ package org.apache.airavata.helix.impl.task.submission; -import com.google.common.collect.ImmutableMap; import groovy.lang.Writable; import groovy.text.GStringTemplateEngine; import groovy.text.TemplateEngine; import org.apache.airavata.common.utils.ApplicationSettings; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.File; import java.lang.reflect.Field; @@ -15,6 +16,8 @@ import java.util.Map; public class GroovyMapData { + private static final Logger logger = LogManager.getLogger(GroovyMapData.class); + @ScriptTag(name = "inputDir") private String inputDir; @@ -453,6 +456,11 @@ public class GroovyMapData { } catch (Exception e) { throw new Exception("Error while generating script using groovy map"); } + + if (logger.isTraceEnabled()) { + logger.trace("Groovy map as string for template " + templateName); + logger.trace(make.toString()); + } return make.toString(); } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java index fab4747..c85e18b 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java @@ -50,9 +50,11 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { if (mapData != null) { //jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( + getTaskContext().getGatewayId(), getTaskContext().getComputeResourceId(), getTaskContext().getJobSubmissionProtocol().name(), - getTaskContext().getComputeResourceCredentialToken()); + getTaskContext().getComputeResourceCredentialToken(), + getTaskContext().getComputeResourceLoginUserName()); JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory()); @@ -69,6 +71,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { statusList.add(new JobStatus(JobState.FAILED)); statusList.get(0).setReason(submissionOutput.getFailureReason()); jobModel.setJobStatuses(statusList); + jobModel.setJobDescription("Sample description"); saveJobModel(jobModel); logger.error("expId: " + getExperimentId() + ", processid: " + getProcessId()+ ", taskId: " + getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName()); @@ -83,6 +86,8 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { //taskStatus.setReason("Job submission command didn't return a jobId"); //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); //taskContext.setTaskStatus(taskStatus); + logger.error("Standard error message : " + submissionOutput.getStdErr()); + logger.error("Standard out message : " + submissionOutput.getStdOut()); return onFail("Job submission command didn't return a jobId", false, null); } else { @@ -116,6 +121,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { //TODO save task status?? } else if (jobId != null && !jobId.isEmpty()) { + logger.info("Received job id " + jobId + " from compute resource"); jobModel.setJobId(jobId); saveJobModel(jobModel); JobStatus jobStatus = new JobStatus(); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java index 58b70ef..2e4a052 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java @@ -36,9 +36,11 @@ public class ForkJobSubmissionTask extends JobSubmissionTask { if (mapData != null) { //jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( + getTaskContext().getGatewayId(), getTaskContext().getComputeResourceId(), getTaskContext().getJobSubmissionProtocol().name(), - getTaskContext().getComputeResourceCredentialToken()); + getTaskContext().getComputeResourceCredentialToken(), + getTaskContext().getComputeResourceLoginUserName()); JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory()); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java index 11e59eb..1a024a7 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java @@ -28,6 +28,8 @@ import org.apache.airavata.model.status.JobStatus; import org.apache.airavata.registry.cpi.*; import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.File; import java.security.SecureRandom; @@ -35,7 +37,7 @@ import java.util.*; public abstract class JobSubmissionTask extends AiravataTask { - + private static final Logger logger = LogManager.getLogger(JobSubmissionTask.class); @Override public void init(HelixManager manager, String workflowName, String jobName, String taskName) { @@ -52,10 +54,19 @@ public abstract class JobSubmissionTask extends AiravataTask { int number = new SecureRandom().nextInt(); number = (number < 0 ? -number : number); File tempJobFile = new File(getLocalDataDir(), "job_" + Integer.toString(number) + jobManagerConfiguration.getScriptExtension()); + FileUtils.writeStringToFile(tempJobFile, scriptAsString); + logger.info("Job submission file for process " + getProcessId() + " was created at : " + tempJobFile.getAbsolutePath()); + logger.info("Copying file form " + tempJobFile.getAbsolutePath() + " to remote path " + workingDirectory + + " of compute resource " + getTaskContext().getComputeResourceId()); + agentAdaptor.copyFile(tempJobFile.getAbsolutePath(), workingDirectory); // TODO transfer file RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, tempJobFile.getPath()); + + logger.debug("Submit command for process id " + getProcessId() + " : " + submitCommand.getRawCommand()); + logger.debug("Working directory for process id " + getProcessId() + " : " + workingDirectory); + CommandOutput commandOutput = agentAdaptor.executeCommand(submitCommand.getRawCommand(), workingDirectory); JobSubmissionOutput jsoutput = new JobSubmissionOutput(); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java index 67ad0db..e3ae4fa 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java @@ -43,10 +43,12 @@ public class LocalJobSubmissionTask extends JobSubmissionTask { saveJobModel(jobModel); AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( + getTaskContext().getGatewayId(), getTaskContext().getComputeResourceId(), getTaskContext().getJobSubmissionProtocol().name(), - getTaskContext().getComputeResourceCredentialToken()); - + getTaskContext().getComputeResourceCredentialToken(), + getTaskContext().getComputeResourceLoginUserName()); + GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build(); JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, groovyMapData.getWorkingDirectory()); diff --git a/modules/helix-spectator/src/main/resources/airavata-server.properties b/modules/helix-spectator/src/main/resources/airavata-server.properties index 5f47d79..b54b28c 100644 --- a/modules/helix-spectator/src/main/resources/airavata-server.properties +++ b/modules/helix-spectator/src/main/resources/airavata-server.properties @@ -202,9 +202,9 @@ job.notification.flags=abe ########################################################################### # Credential Store module Configuration ########################################################################### -credential.store.keystore.url=/home/pga/master-deployment/keystores/cred_store.jks +credential.store.keystore.url=/Users/dimuthu/code/fork/airavata/modules/helix-spectator/src/main/resources/cred_store.jks credential.store.keystore.alias=seckey -credential.store.keystore.password=123456 +credential.store.keystore.password=credstore123 credential.store.jdbc.url=jdbc:mariadb://149.165.168.248:3306/credential_store credential.store.jdbc.user=eroma credential.store.jdbc.password=eroma123456 diff --git a/modules/helix-spectator/src/main/resources/log4j.properties b/modules/helix-spectator/src/main/resources/log4j.properties index e910f32..69a4301 100644 --- a/modules/helix-spectator/src/main/resources/log4j.properties +++ b/modules/helix-spectator/src/main/resources/log4j.properties @@ -3,6 +3,8 @@ log4j.rootLogger=INFO, A1 log4j.category.org.apache.helix=WARN log4j.category.org.apache.zookeeper=WARN +log4j.category.org.apache.airavata.helix.impl.task.submission.GroovyMapData=TRACE +log4j.category.org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask=DEBUG # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender -- To stop receiving notification emails like this one, please contact [email protected].
