This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch helix-integration in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 7350b253564a91e4a6ebcd8df3673c69f3d1ba92 Author: dimuthu <dimuthu.upeks...@gmail.com> AuthorDate: Thu Mar 1 12:27:31 2018 -0500 Implementing DataStaging tasks --- .../apache/airavata/agents/api/AgentAdaptor.java | 6 +- .../agents/api/StorageResourceAdaptor.java | 7 + .../helix/agent/local/LocalAgentAdaptor.java | 25 ++- .../airavata/helix/agent/ssh/SshAgentAdaptor.java | 128 ++++++++++++++- .../agent/storage/StorageResourceAdaptorImpl.java | 85 ++++++++++ .../apache/airavata/helix/task/api/TaskHelper.java | 1 + .../helix/task/api/support/AdaptorSupport.java | 5 +- .../helix/core/support/AdaptorSupportImpl.java | 8 + .../helix/core/support/TaskHelperImpl.java | 1 + .../helix/impl/participant/GlobalParticipant.java | 3 +- .../airavata/helix/impl/task/DataStagingTask.java | 90 +++++++++-- .../helix/impl/task/InputDataStagingTask.java | 117 ++++++++++++++ .../helix/impl/task/OutputDataStagingTask.java | 171 +++++++++++++++++++++ .../airavata/helix/impl/task/TaskContext.java | 6 +- .../helix/impl/task/TaskOnFailException.java | 28 ++++ .../impl/task/submission/GroovyMapBuilder.java | 3 +- .../task/submission/task/JobSubmissionTask.java | 2 +- .../helix/impl/workflow/SimpleWorkflow.java | 17 +- 18 files changed, 673 insertions(+), 30 deletions(-) diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java index 2d295de..2948dc1 100644 --- a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java +++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java @@ -17,7 +17,11 @@ public interface AgentAdaptor { public void createDirectory(String path) throws AgentException; - public void copyFile(String sourceFile, String destinationFile) throws AgentException; + public void copyFileTo(String localFile, String remoteFile) throws AgentException; + + public void copyFileFrom(String remoteFile, String localFile) throws AgentException; public List<String> listDirectory(String path) throws AgentException; + + public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException; } diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java new file mode 100644 index 0000000..9c5d471 --- /dev/null +++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java @@ -0,0 +1,7 @@ +package org.apache.airavata.agents.api; + +public interface StorageResourceAdaptor { + public void init(String storageResourceId, String gatewayId, String loginUser, String token) throws AgentException; + public void uploadFile(String sourceFile, String destFile) throws AgentException; + public void downloadFile(String sourceFile, String destFile) throws AgentException; +} diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java index af507bf..7a56526 100644 --- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java +++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java @@ -3,9 +3,7 @@ package org.apache.airavata.helix.agent.local; import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.AgentException; import org.apache.airavata.agents.api.CommandOutput; -import org.apache.airavata.agents.api.JobSubmissionOutput; -import java.io.File; import java.util.List; public class LocalAgentAdaptor implements AgentAdaptor { @@ -13,31 +11,42 @@ public class LocalAgentAdaptor implements AgentAdaptor { public void init(Object agentPams) throws AgentException { - + throw new AgentException("Operation not implemented"); } @Override public void init(String computeResource, String gatewayId, String userId, String token) throws AgentException { - + throw new AgentException("Operation not implemented"); } @Override public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException { - return null; + throw new AgentException("Operation not implemented"); } @Override public void createDirectory(String path) throws AgentException { - + throw new AgentException("Operation not implemented"); } @Override - public void copyFile(String sourceFile, String destinationFile) throws AgentException { + public void copyFileTo(String localFile, String remoteFile) throws AgentException { + throw new AgentException("Operation not implemented"); + } + @Override + public void copyFileFrom(String remoteFile, String localFile) throws AgentException { + throw new AgentException("Operation not implemented"); } + @Override public List<String> listDirectory(String path) throws AgentException { - return null; + throw new AgentException("Operation not implemented"); + } + + @Override + public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException { + throw new AgentException("Operation not implemented"); } } diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java index ef8d580..2ad2415 100644 --- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java +++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java @@ -190,7 +190,7 @@ public class SshAgentAdaptor implements AgentAdaptor { } } - public void copyFile(String localFile, String remoteFile) throws AgentException { + public void copyFileTo(String localFile, String remoteFile) throws AgentException { FileInputStream fis = null; String prefix = null; if (new File(localFile).isDirectory()) { @@ -296,6 +296,127 @@ public class SshAgentAdaptor implements AgentAdaptor { } } + // TODO file not found does not return exception + public void copyFileFrom(String remoteFile, String localFile) throws AgentException { + FileOutputStream fos = null; + ChannelExec channelExec = null; + try { + String prefix = null; + if (new File(localFile).isDirectory()) { + prefix = localFile + File.separator; + } + + StandardOutReader stdOutReader = new StandardOutReader(); + + // exec 'scp -f remotefile' remotely + String command = "scp -f " + remoteFile; + channelExec = (ChannelExec)session.openChannel("exec"); + channelExec.setCommand(command); + + //channelExec.setErrStream(stdOutReader.getStandardError()); + // get I/O streams for remote scp + OutputStream out = channelExec.getOutputStream(); + InputStream in = channelExec.getInputStream(); + InputStream err = channelExec.getErrStream(); + + if (!channelExec.isClosed()){ + channelExec.connect(); + } + + byte[] buf = new byte[1024]; + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + while (true) { + int c = checkAck(in); + if (c != 'C') { + break; + } + + // read '0644 ' + in.read(buf, 0, 5); + + long filesize = 0L; + while (true) { + if (in.read(buf, 0, 1) < 0) { + // error + break; + } + if (buf[0] == ' ') break; + filesize = filesize * 10L + (long) (buf[0] - '0'); + } + + String file = null; + for (int i = 0; ; i++) { + in.read(buf, i, 1); + if (buf[i] == (byte) 0x0a) { + file = new String(buf, 0, i); + break; + } + } + + //System.out.println("filesize="+filesize+", file="+file); + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + // read a content of lfile + fos = new FileOutputStream(prefix == null ? localFile : prefix + file); + int foo; + while (true) { + if (buf.length < filesize) foo = buf.length; + else foo = (int) filesize; + foo = in.read(buf, 0, foo); + if (foo < 0) { + // error + break; + } + fos.write(buf, 0, foo); + filesize -= foo; + if (filesize == 0L) break; + } + fos.close(); + fos = null; + + if (checkAck(in) != 0) { + String error = "Error transfering the file content"; + //log.error(error); + throw new AgentException(error); + } + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + } + + + stdOutReader.readStdErrFromStream(err); + if (stdOutReader.getStdError().contains("scp:")) { + throw new AgentException(stdOutReader.getStdError()); + } + + } catch (Exception e) { + //log.error(e.getMessage(), e); + throw new AgentException(e); + } finally { + try { + if (fos != null) fos.close(); + } catch (Exception ee) { + } + + if (channelExec != null) { + channelExec.disconnect(); + } + + } + } + @Override public List<String> listDirectory(String path) throws AgentException { String command = "ls " + path; @@ -333,6 +454,11 @@ public class SshAgentAdaptor implements AgentAdaptor { } } + @Override + public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException { + throw new AgentException("Operation not implemented"); + } + private static class DefaultUserInfo implements UserInfo, UIKeyboardInteractive { private String userName; diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java new file mode 100644 index 0000000..537f17d --- /dev/null +++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java @@ -0,0 +1,85 @@ +package org.apache.airavata.helix.agent.storage; + +import com.jcraft.jsch.Session; +import org.apache.airavata.agents.api.AgentException; +import org.apache.airavata.agents.api.StorageResourceAdaptor; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.DBUtil; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.credential.Credential; +import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; +import org.apache.airavata.credential.store.store.CredentialStoreException; +import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl; +import org.apache.airavata.helix.agent.ssh.SshAdaptorParams; +import org.apache.airavata.helix.agent.ssh.SshAgentAdaptor; +import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; +import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.AppCatalog; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +public class StorageResourceAdaptorImpl extends SshAgentAdaptor implements StorageResourceAdaptor { + + private static final Logger logger = LogManager.getLogger(SshAgentAdaptor.class); + + private Session session = null; + private AppCatalog appCatalog; + + @Override + public void init(String storageResourceId, String gatewayId, String loginUser, String token) throws AgentException { + + try { + logger.info("Initializing Storage Resource Adaptor for storage resource : "+ storageResourceId + ", gateway : " + + gatewayId +", user " + loginUser + ", token : " + token); + this.appCatalog = RegistryFactory.getAppCatalog(); + StorageResourceDescription storageResource = appCatalog.getStorageResource().getStorageResource(storageResourceId); + String hostName = storageResource.getHostName(); + + String jdbcUrl = ServerSettings.getCredentialStoreDBURL(); + String jdbcUsr = ServerSettings.getCredentialStoreDBUser(); + String jdbcPass = ServerSettings.getCredentialStoreDBPassword(); + String driver = ServerSettings.getCredentialStoreDBDriver(); + CredentialReaderImpl credentialReader = new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass, driver)); + + logger.info("Fetching credentials for cred store token " + token); + + Credential credential = credentialReader.getCredential(gatewayId, token); + + if (credential instanceof SSHCredential) { + SSHCredential sshCredential = SSHCredential.class.cast(credential); + SshAdaptorParams adaptorParams = new SshAdaptorParams(); + adaptorParams.setHostName(storageResource.getHostName()); + adaptorParams.setUserName(loginUser); + adaptorParams.setPassphrase(sshCredential.getPassphrase()); + adaptorParams.setPrivateKey(sshCredential.getPrivateKey()); + adaptorParams.setPublicKey(sshCredential.getPublicKey()); + adaptorParams.setStrictHostKeyChecking(false); + init(adaptorParams); + } + + } catch (AppCatalogException e) { + e.printStackTrace(); + } catch (InstantiationException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } catch (CredentialStoreException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } catch (ApplicationSettingsException e) { + e.printStackTrace(); + } + } + + @Override + public void uploadFile(String sourceFile, String destFile) throws AgentException { + super.copyFileTo(sourceFile, destFile); + } + + @Override + public void downloadFile(String sourceFile, String destFile) throws AgentException { + super.copyFileFrom(sourceFile, destFile); + } +} diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java index 07de06e..4550a66 100644 --- a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java +++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java @@ -1,5 +1,6 @@ package org.apache.airavata.helix.task.api; +import org.apache.airavata.agents.api.StorageResourceAdaptor; import org.apache.airavata.helix.task.api.support.AdaptorSupport; /** diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java index 4b6e11e..456fdee 100644 --- a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java +++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java @@ -1,8 +1,6 @@ package org.apache.airavata.helix.task.api.support; -import org.apache.airavata.agents.api.AgentAdaptor; -import org.apache.airavata.agents.api.CommandOutput; -import org.apache.airavata.agents.api.JobSubmissionOutput; +import org.apache.airavata.agents.api.*; import java.io.File; @@ -16,5 +14,6 @@ public interface AdaptorSupport { public void initializeAdaptor(); public AgentAdaptor fetchAdaptor(String gatewayId, String computeResource, String protocol, String authToken, String userId) throws Exception; + public StorageResourceAdaptor fetchStorageAdaptor(String gatewayId, String storageResourceId, String protocol, String authToken, String userId) throws AgentException; } diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java index a98b8f0..c264012 100644 --- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java +++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java @@ -2,6 +2,7 @@ package org.apache.airavata.helix.core.support; import org.apache.airavata.agents.api.*; import org.apache.airavata.helix.agent.ssh.SshAgentAdaptor; +import org.apache.airavata.helix.agent.storage.StorageResourceAdaptorImpl; import org.apache.airavata.helix.task.api.support.AdaptorSupport; import java.io.File; @@ -35,4 +36,11 @@ public class AdaptorSupportImpl implements AdaptorSupport { agentAdaptor.init(computeResource, gatewayId, userId, authToken); return agentAdaptor; } + + @Override + public StorageResourceAdaptor fetchStorageAdaptor(String gatewayId, String storageResourceId, String protocol, String authToken, String userId) throws AgentException { + StorageResourceAdaptor storageResourceAdaptor = new StorageResourceAdaptorImpl(); + storageResourceAdaptor.init(storageResourceId, gatewayId, userId, authToken); + return storageResourceAdaptor; + } } diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java index 77fc5ce..2987ebd 100644 --- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java +++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java @@ -1,5 +1,6 @@ package org.apache.airavata.helix.core.support; +import org.apache.airavata.agents.api.StorageResourceAdaptor; import org.apache.airavata.helix.task.api.TaskHelper; /** diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java index f0e166b..984b277 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java @@ -16,7 +16,8 @@ public class GlobalParticipant extends HelixParticipant { private String[] taskClasses = { "org.apache.airavata.helix.impl.task.EnvSetupTask", - "org.apache.airavata.helix.impl.task.DataStagingTask", + "org.apache.airavata.helix.impl.task.InputDataStagingTask", + "org.apache.airavata.helix.impl.task.OutputDataStagingTask", "org.apache.airavata.helix.impl.task.submission.task.ForkJobSubmissionTask", "org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask", "org.apache.airavata.helix.impl.task.submission.task.LocalJobSubmissionTask" diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java index 346aa73..594cbc9 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java @@ -1,19 +1,89 @@ package org.apache.airavata.helix.impl.task; -import org.apache.airavata.helix.task.api.TaskHelper; -import org.apache.airavata.helix.task.api.annotation.TaskDef; -import org.apache.helix.task.TaskResult; +import org.apache.airavata.agents.api.AgentAdaptor; +import org.apache.airavata.agents.api.AgentException; +import org.apache.airavata.agents.api.StorageResourceAdaptor; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.helix.task.api.support.AdaptorSupport; +import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; +import org.apache.airavata.model.task.DataStagingTaskModel; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.commons.io.FileUtils; -@TaskDef(name = "Data Staging Task") -public class DataStagingTask extends AiravataTask { +import java.io.File; +import java.io.IOException; - @Override - public TaskResult onRun(TaskHelper taskHelper) { - return null; +public abstract class DataStagingTask extends AiravataTask { + + protected DataStagingTaskModel getDataStagingTaskModel() throws TaskOnFailException { + try { + Object subTaskModel = getTaskContext().getSubTaskModel(); + if (subTaskModel != null) { + return DataStagingTaskModel.class.cast(subTaskModel); + } else { + throw new TaskOnFailException("Data staging task model can not be null for task " + getTaskId(), true, null); + } + } catch (Exception e) { + throw new TaskOnFailException("Failed while obtaining data staging task model for task " + getTaskId(), true, e); + } + } + + protected StorageResourceDescription getStorageResource() throws TaskOnFailException { + try { + StorageResourceDescription storageResource = getTaskContext().getStorageResource(); + if (storageResource == null) { + throw new TaskOnFailException("Storage resource can not be null for task " + getTaskId(), true, null); + } + return storageResource; + } catch (AppCatalogException e) { + throw new TaskOnFailException("Failed to fetch the storage resource for task " + getTaskId(), true, e); + } } - @Override - public void onCancel() { + protected StorageResourceAdaptor getStorageAdaptor(AdaptorSupport adaptorSupport) throws TaskOnFailException { + try { + StorageResourceAdaptor storageResourceAdaptor = adaptorSupport.fetchStorageAdaptor( + getGatewayId(), + getTaskContext().getStorageResourceId(), + "SSH", + getTaskContext().getStorageResourceCredentialToken(), + getTaskContext().getStorageResourceLoginUserName()); + + if (storageResourceAdaptor == null) { + throw new TaskOnFailException("Storage resource adaptor for " + getTaskContext().getStorageResourceId() + " can not be null", true, null); + } + return storageResourceAdaptor; + } catch (AgentException e) { + throw new TaskOnFailException("Failed to obtain adaptor for storage resource " + getTaskContext().getStorageResourceId() + + " in task " + getTaskId(), true, e); + } + } + + protected AgentAdaptor getComputeResourceAdaptor(AdaptorSupport adaptorSupport) throws TaskOnFailException { + try { + return adaptorSupport.fetchAdaptor( + getTaskContext().getGatewayId(), + getTaskContext().getComputeResourceId(), + getTaskContext().getJobSubmissionProtocol().name(), + getTaskContext().getComputeResourceCredentialToken(), + getTaskContext().getComputeResourceLoginUserName()); + } catch (Exception e) { + throw new TaskOnFailException("Failed to obtain adaptor for compute resource " + getTaskContext().getComputeResourceId() + + " in task " + getTaskId(), true, e); + } + } + protected String getLocalDataPath(String fileName) throws TaskOnFailException { + String localDataPath = ServerSettings.getLocalDataLocation(); + localDataPath = (localDataPath.endsWith(File.separator) ? localDataPath : localDataPath + File.separator); + localDataPath = (localDataPath.endsWith(File.separator) ? localDataPath : localDataPath + File.separator) + + getProcessId() + File.separator + "temp_inputs" + File.separator; + try { + FileUtils.forceMkdir(new File(localDataPath)); + } catch (IOException e) { + throw new TaskOnFailException("Failed build directories " + localDataPath, true, e); + } + localDataPath = localDataPath + fileName; + return localDataPath; } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java new file mode 100644 index 0000000..30eeec0 --- /dev/null +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java @@ -0,0 +1,117 @@ +package org.apache.airavata.helix.impl.task; + +import org.apache.airavata.agents.api.AgentAdaptor; +import org.apache.airavata.agents.api.AgentException; +import org.apache.airavata.agents.api.StorageResourceAdaptor; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.helix.task.api.TaskHelper; +import org.apache.airavata.helix.task.api.annotation.TaskDef; +import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; +import org.apache.airavata.model.application.io.InputDataObjectType; +import org.apache.airavata.model.task.DataStagingTaskModel; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.commons.io.FileUtils; +import org.apache.helix.task.TaskResult; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +@TaskDef(name = "Input Data Staging Task") +public class InputDataStagingTask extends DataStagingTask { + + private static final Logger logger = LogManager.getLogger(InputDataStagingTask.class); + + @Override + public TaskResult onRun(TaskHelper taskHelper) { + logger.info("Starting Input Data Staging Task " + getTaskId()); + + try { + // Get and validate data staging task model + DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel(); + + // Fetch and validate input data type from data staging task model + InputDataObjectType processInput = dataStagingTaskModel.getProcessInput(); + if (processInput != null && processInput.getValue() == null) { + String message = "expId: " + getExperimentId() + ", processId: " + getProcessId() + ", taskId: " + getTaskId() + + ":- Couldn't stage file " + processInput.getName() + " , file name shouldn't be null. "; + logger.error(message); + if (processInput.isIsRequired()) { + message += "File name is null, but this input's isRequired bit is not set"; + } else { + message += "File name is null"; + } + logger.error(message); + throw new TaskOnFailException(message, true, null); + } + + // Fetch and validate storage resource + StorageResourceDescription storageResource = getStorageResource(); + + // Fetch and validate source and destination URLS + URI sourceURI; + URI destinationURI; + String sourceFileName; + try { + sourceURI = new URI(dataStagingTaskModel.getSource()); + destinationURI = new URI(dataStagingTaskModel.getDestination()); + + if (logger.isDebugEnabled()) { + logger.debug("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId()); + } + + sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1, + sourceURI.getPath().length()); + } catch (URISyntaxException e) { + throw new TaskOnFailException("Failed to obtain source URI for input data staging task " + getTaskId(), true, e); + } + + // Fetch and validate storage adaptor + StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport()); + + // Fetch and validate compute resource adaptor + AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport()); + + String localSourceFilePath = getLocalDataPath(sourceFileName); + // Downloading input file from the storage resource + try { + logger.info("Downloading input file " + sourceURI.getPath() + " to the local path " + localSourceFilePath); + storageResourceAdaptor.downloadFile(sourceURI.getPath(), localSourceFilePath); + logger.info("Input file downloaded to " + localSourceFilePath); + } catch (AgentException e) { + throw new TaskOnFailException("Failed downloading input file " + sourceFileName + " to the local path " + localSourceFilePath, true, e); + } + + // Uploading input file to the compute resource + try { + logger.info("Uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath); + adaptor.copyFileTo(localSourceFilePath, destinationURI.getPath()); + logger.info("Output file uploaded to " + destinationURI.getPath()); + } catch (AgentException e) { + throw new TaskOnFailException("Failed uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath, true, e); + } + + return onSuccess("Input data staging task " + getTaskId() + " successfully completed"); + + } catch (TaskOnFailException e) { + if (e.getError() != null) { + logger.error(e.getReason(), e.getError()); + } else { + logger.error(e.getReason()); + } + return onFail(e.getReason(), e.isCritical(), e.getError()); + + }catch (Exception e) { + logger.error("Unknown error while executing input data staging task " + getTaskId(), e); + return onFail("Unknown error while executing input data staging task " + getTaskId(), false, e); + } + } + + @Override + public void onCancel() { + + } +} diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java new file mode 100644 index 0000000..d2280d0 --- /dev/null +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java @@ -0,0 +1,171 @@ +package org.apache.airavata.helix.impl.task; + +import org.apache.airavata.agents.api.AgentAdaptor; +import org.apache.airavata.agents.api.AgentException; +import org.apache.airavata.agents.api.StorageResourceAdaptor; +import org.apache.airavata.helix.task.api.TaskHelper; +import org.apache.airavata.helix.task.api.annotation.TaskDef; +import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; +import org.apache.airavata.model.application.io.OutputDataObjectType; +import org.apache.airavata.model.task.DataStagingTaskModel; +import org.apache.airavata.registry.cpi.ExpCatChildDataType; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.helix.task.TaskResult; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.List; + +@TaskDef(name = "Output Data Staging Task") +public class OutputDataStagingTask extends DataStagingTask { + + private static final Logger logger = LogManager.getLogger(OutputDataStagingTask.class); + + @Override + public TaskResult onRun(TaskHelper taskHelper) { + + try { + // Get and validate data staging task model + DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel(); + + // Fetch and validate input data type from data staging task model + OutputDataObjectType processOutput = dataStagingTaskModel.getProcessOutput(); + if (processOutput != null && processOutput.getValue() == null) { + String message = "expId: " + getExperimentId() + ", processId: " + getProcessId() + ", taskId: " + getTaskId() + + ":- Couldn't stage file " + processOutput.getName() + " , file name shouldn't be null. "; + logger.error(message); + if (processOutput.isIsRequired()) { + message += "File name is null, but this output's isRequired bit is not set"; + } else { + message += "File name is null"; + } + throw new TaskOnFailException(message, true, null); + } + + // Fetch and validate storage resource + // Fetch and validate storage resource + StorageResourceDescription storageResource = getStorageResource(); + + // Fetch and validate source and destination URLS + URI sourceURI; + URI destinationURI; + String sourceFileName; + try { + sourceURI = new URI(dataStagingTaskModel.getSource()); + destinationURI = new URI(dataStagingTaskModel.getDestination()); + + if (logger.isDebugEnabled()) { + logger.debug("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId()); + } + + sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1, + sourceURI.getPath().length()); + } catch (URISyntaxException e) { + throw new TaskOnFailException("Failed to obtain source URI for output data staging task " + getTaskId(), true, e); + } + + // Fetch and validate storage adaptor + StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport()); + + // Fetch and validate compute resource adaptor + AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport()); + + if (sourceFileName.contains("*")) { + // if file is declared as a wild card + logger.info("Handling output files with " + sourceFileName + " extension for task " + getTaskId()); + + String destParentPath = (new File(destinationURI.getPath())).getParentFile().getPath(); + String sourceParentPath = (new File(sourceURI.getPath())).getParentFile().getPath(); + + logger.debug("Destination parent path " + destParentPath + ", source parent path " + sourceParentPath); + List<String> fileNames = null; + try { + fileNames = adaptor.getFileNameFromExtension(sourceFileName, sourceParentPath); + + if (logger.isTraceEnabled()) { + fileNames.forEach(fileName -> logger.trace("File found : " + fileName)); + } + + } catch (AgentException e) { + throw new TaskOnFailException("Failed to fetch the file list from extension " + sourceFileName, true, e); + } + + for (String temp : fileNames) { + if (temp != null && !temp.equals("")) { + sourceFileName = temp; + } + if (destParentPath.endsWith(File.separator)) { + destinationURI = new URI(destParentPath + sourceFileName); + } else { + destinationURI = new URI(destParentPath + File.separator + sourceFileName); + } + + //Wildcard support is only enabled for output data staging + processOutput.setName(sourceFileName); + + try { + getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_OUTPUT, Arrays.asList(processOutput), getExperimentId()); + getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_OUTPUT, Arrays.asList(processOutput), getProcessId()); + } catch (RegistryException e) { + throw new TaskOnFailException("Failed to update experiment or process outputs for task " + getTaskId(), true, e); + } + + logger.info("Transferring file " + sourceFileName); + transferFile(sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor); + } + + } else { + // Downloading input file from the storage resource + transferFile(sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor); + return onSuccess("Input data staging task " + getTaskId() + " successfully completed"); + } + + } catch (TaskOnFailException e) { + if (e.getError() != null) { + logger.error(e.getReason(), e.getError()); + } else { + logger.error(e.getReason()); + } + return onFail(e.getReason(), e.isCritical(), e.getError()); + + } catch (Exception e) { + logger.error("Unknown error while executing output data staging task " + getTaskId(), e); + return onFail("Unknown error while executing output data staging task " + getTaskId(), false, e); + } + + return null; + } + + private void transferFile(URI sourceURI, URI destinationURI, String fileName, AgentAdaptor adaptor, + StorageResourceAdaptor storageResourceAdaptor) throws TaskOnFailException { + String localSourceFilePath = getLocalDataPath(fileName); + + try { + logger.info("Downloading output file " + sourceURI.getPath() + " to the local path " + localSourceFilePath); + adaptor.copyFileFrom(sourceURI.getPath(), localSourceFilePath); + logger.info("Output file downloaded to " + localSourceFilePath); + } catch (AgentException e) { + throw new TaskOnFailException("Failed downloading output file " + sourceURI.getPath() + " to the local path " + + localSourceFilePath, true, e); + } + + // Uploading input file to the compute resource + try { + logger.info("Uploading the output file to " + destinationURI.getPath() + " from local path " + localSourceFilePath); + storageResourceAdaptor.uploadFile(localSourceFilePath, destinationURI.getPath()); + logger.info("Output file uploaded to " + destinationURI.getPath()); + } catch (AgentException e) { + throw new TaskOnFailException("Failed uploading the output file to " + destinationURI.getPath() + " from local path " + + localSourceFilePath, true, e); + } + } + + @Override + public void onCancel() { + + } +} diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java index f33d8a1..64a7de8 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java @@ -511,7 +511,11 @@ public class TaskContext { this.currentExecutingTaskModel = currentExecutingTaskModel; } - public StorageResourceDescription getStorageResource() { + public StorageResourceDescription getStorageResource() throws AppCatalogException { + if (storageResource == null) { + this.storageResource = appCatalog.getStorageResource() + .getStorageResource(processModel.getStorageResourceId()); + } return storageResource; } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskOnFailException.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskOnFailException.java new file mode 100644 index 0000000..196a219 --- /dev/null +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskOnFailException.java @@ -0,0 +1,28 @@ +package org.apache.airavata.helix.impl.task; + +public class TaskOnFailException extends Exception { + + + private String reason; + private boolean critical; + private Throwable e; + + public TaskOnFailException(String reason, boolean critical, Throwable e) { + super(reason, e); + this.reason = reason; + this.critical = critical; + this.e = e; + } + + public String getReason() { + return reason; + } + + public boolean isCritical() { + return critical; + } + + public Throwable getError() { + return e; + } +} diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java index 16e8114..e4267ce 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java @@ -67,8 +67,7 @@ public class GroovyMapBuilder { inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), false)); mapData.setInputsAll(inputValuesAll); - //mapData.setUserName(taskContext.geJo) - + mapData.setUserName(taskContext.getComputeResourceLoginUserName()); mapData.setShellName("/bin/bash"); if (taskContext != null) { diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java index 1a024a7..b517af1 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java @@ -60,7 +60,7 @@ public abstract class JobSubmissionTask extends AiravataTask { logger.info("Copying file form " + tempJobFile.getAbsolutePath() + " to remote path " + workingDirectory + " of compute resource " + getTaskContext().getComputeResourceId()); - agentAdaptor.copyFile(tempJobFile.getAbsolutePath(), workingDirectory); + agentAdaptor.copyFileTo(tempJobFile.getAbsolutePath(), workingDirectory); // TODO transfer file RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, tempJobFile.getPath()); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java index 99db2c4..63921db 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java @@ -4,6 +4,8 @@ import org.apache.airavata.helix.core.AbstractTask; import org.apache.airavata.helix.core.OutPort; import org.apache.airavata.helix.impl.task.AiravataTask; import org.apache.airavata.helix.impl.task.EnvSetupTask; +import org.apache.airavata.helix.impl.task.InputDataStagingTask; +import org.apache.airavata.helix.impl.task.OutputDataStagingTask; import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask; import org.apache.airavata.helix.workflow.WorkflowManager; import org.apache.airavata.model.experiment.ExperimentModel; @@ -37,14 +39,25 @@ public class SimpleWorkflow { String[] taskIds = taskDag.split(","); final List<AiravataTask> allTasks = new ArrayList<>(); + boolean jobSubmissionFound = false; + for (String taskId : taskIds) { Optional<TaskModel> model = taskList.stream().filter(taskModel -> taskModel.getTaskId().equals(taskId)).findFirst(); - model.ifPresent(taskModel -> { + + if (model.isPresent()) { + TaskModel taskModel = model.get(); AiravataTask airavataTask = null; if (taskModel.getTaskType() == TaskTypes.ENV_SETUP) { airavataTask = new EnvSetupTask(); } else if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) { airavataTask = new DefaultJobSubmissionTask(); + jobSubmissionFound = true; + } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) { + if (jobSubmissionFound) { + airavataTask = new OutputDataStagingTask(); + } else { + airavataTask = new InputDataStagingTask(); + } } if (airavataTask != null) { @@ -57,7 +70,7 @@ public class SimpleWorkflow { } allTasks.add(airavataTask); } - }); + } } /* DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask(); -- To stop receiving notification emails like this one, please contact dimuthu...@apache.org.