This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new 9bfc94d Assigning maximum size threshold for archiving task
9bfc94d is described below
commit 9bfc94dfaaae88e58d9fc9df32ebedac39520d70
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Fri Nov 1 15:10:00 2019 -0400
Assigning maximum size threshold for archiving task
---
.../helix/impl/task/staging/ArchiveTask.java | 92 +++++++++++++---------
1 file changed, 53 insertions(+), 39 deletions(-)
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
index 6be6fce..6821d05 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
@@ -19,10 +19,8 @@
*/
package org.apache.airavata.helix.impl.task.staging;
-import org.apache.airavata.agents.api.AgentAdaptor;
-import org.apache.airavata.agents.api.AgentException;
-import org.apache.airavata.agents.api.CommandOutput;
-import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.agents.api.*;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.impl.task.TaskOnFailException;
import org.apache.airavata.helix.task.api.TaskHelper;
@@ -41,7 +39,12 @@ import java.net.URISyntaxException;
public class ArchiveTask extends DataStagingTask {
private final static Logger logger =
LoggerFactory.getLogger(ArchiveTask.class);
+ private final static long MAX_ARCHIVE_SIZE = 1024 * 1024 * 20; // 20GB
+
+ public static void main(String a[]) {
+ System.out.println(MAX_ARCHIVE_SIZE);
+ }
@Override
public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
logger.info("Starting archival task " + getTaskId() + " in experiment
" + getExperimentId());
@@ -94,45 +97,56 @@ public class ArchiveTask extends DataStagingTask {
throw new TaskOnFailException("Failed while running the tar
command " + tarringCommand, true, null);
}
- boolean fileTransferred =
transferFileToStorage(tarCreationAbsPath, destFilePath, archiveFileName,
adaptor, storageResourceAdaptor);
-
- if (!fileTransferred) {
- logger.error("Failed to transfer created archive file " +
tarCreationAbsPath);
- throw new TaskOnFailException("Failed to transfer created
archive file " + tarCreationAbsPath, false, null);
- }
-
- String deleteTarCommand = "rm " + tarCreationAbsPath;
- logger.info("Running delete temporary tar command " +
deleteTarCommand);
-
try {
- CommandOutput rmCommandOutput =
adaptor.executeCommand(deleteTarCommand, null);
- if (rmCommandOutput.getExitCode() != 0) {
- throw new TaskOnFailException("Failed while running the rm
command " + deleteTarCommand + ". Sout : " +
- rmCommandOutput.getStdOut() + ". Serr " +
rmCommandOutput.getStdError(), false, null);
+ FileMetadata fileMetadata =
adaptor.getFileMetadata(tarCreationAbsPath);
+ long maxArchiveSize =
Long.parseLong(ServerSettings.getSetting("max.archive.size", MAX_ARCHIVE_SIZE +
""));
+
+ if (fileMetadata.getSize() < maxArchiveSize) {
+ boolean fileTransferred =
transferFileToStorage(tarCreationAbsPath, destFilePath, archiveFileName,
adaptor, storageResourceAdaptor);
+ if (!fileTransferred) {
+ logger.error("Failed to transfer created archive file
" + tarCreationAbsPath);
+ throw new TaskOnFailException("Failed to transfer
created archive file " + tarCreationAbsPath, false, null);
+ }
+
+ String destParent = destFilePath.substring(0,
destFilePath.lastIndexOf("/"));
+ final String storageArchiveDir = "ARCHIVE";
+ String unArchiveTarCommand = "mkdir " + storageArchiveDir
+ " && tar -xvf " + archiveFileName + " -C "
+ + storageArchiveDir + " && rm " + archiveFileName
+ " && chmod 755 -R " + storageArchiveDir + "/*";
+ logger.info("Running Un archiving command on storage
resource " + unArchiveTarCommand);
+
+ try {
+ CommandOutput unTarCommandOutput =
storageResourceAdaptor.executeCommand(unArchiveTarCommand, destParent);
+ if (unTarCommandOutput.getExitCode() != 0) {
+ throw new TaskOnFailException("Failed while
running the untar command " + unTarCommandOutput + ". Sout : " +
+ unTarCommandOutput.getStdOut() + ". Serr "
+ unTarCommandOutput.getStdError(), false, null);
+ }
+ } catch (AgentException e) {
+ throw new TaskOnFailException("Failed while running
the untar command " + tarringCommand, false, null);
+ }
+
+ return onSuccess("Archival task successfully completed");
+ } else {
+ logger.error("Archive size {} is larger than the maximum
allowed size {}. So skipping the transfer.",
+ fileMetadata.getSize(), maxArchiveSize);
+ // This is not a recoverable issue. So mark it as critical
+ throw new TaskOnFailException("Archive task was skipped as
size is " + fileMetadata.getSize(), true, null);
}
- } catch (AgentException e) {
- throw new TaskOnFailException("Failed while running the rm
command " + tarringCommand, false, null);
- }
-
- String destParent = destFilePath.substring(0,
destFilePath.lastIndexOf("/"));
- final String storageArchiveDir = "ARCHIVE";
- String unArchiveTarCommand = "mkdir " + storageArchiveDir + " &&
tar -xvf " + archiveFileName + " -C "
- + storageArchiveDir + " && rm " + archiveFileName + " &&
chmod 755 -R " + storageArchiveDir + "/*";
- logger.info("Running Un archiving command on storage resource " +
unArchiveTarCommand);
-
- try {
- CommandOutput unTarCommandOutput =
storageResourceAdaptor.executeCommand(unArchiveTarCommand, destParent);
- if (unTarCommandOutput.getExitCode() != 0) {
- throw new TaskOnFailException("Failed while running the
untar command " + deleteTarCommand + ". Sout : " +
- unTarCommandOutput.getStdOut() + ". Serr " +
unTarCommandOutput.getStdError(), false, null);
+ } finally {
+ String deleteTarCommand = "rm " + tarCreationAbsPath;
+ logger.info("Running delete temporary tar command " +
deleteTarCommand);
+ try {
+ CommandOutput rmCommandOutput =
adaptor.executeCommand(deleteTarCommand, null);
+ if (rmCommandOutput.getExitCode() != 0) {
+ logger.error("Failed while running the rm command " +
deleteTarCommand + ". Sout : " +
+ rmCommandOutput.getStdOut() + ". Serr " +
rmCommandOutput.getStdError());
+ }
+
+ } catch (AgentException e) {
+ logger.error("Failed while running the rm command " +
tarringCommand, e);
}
- } catch (AgentException e) {
- throw new TaskOnFailException("Failed while running the untar
command " + tarringCommand, false, null);
}
- return onSuccess("Archival task successfully completed");
-
} catch (TaskOnFailException e) {
if (e.getError() != null) {
logger.error(e.getReason(), e.getError());
@@ -142,8 +156,8 @@ public class ArchiveTask extends DataStagingTask {
return onFail(e.getReason(), e.isCritical(), e.getError());
} catch (Exception e) {
- logger.error("Unknown error while executing output data staging
task " + getTaskId(), e);
- return onFail("Unknown error while executing output data staging
task " + getTaskId(), false, e);
+ logger.error("Unknown error while executing archiving staging task
" + getTaskId(), e);
+ return onFail("Unknown error while executing archiving staging
task " + getTaskId(), false, e);
}
}