This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new bac152f delete docker container if error occurred
new 00d11bb Merge pull request #112 from isururanawaka/master
bac152f is described below
commit bac152f273c39342a6b26f0ac8fc2a59abdce0d6
Author: Isuru Ranawaka <[email protected]>
AuthorDate: Tue Jun 7 11:07:50 2022 -0400
delete docker container if error occurred
---
.../engine/task/impl/GenericDataParsingTask.java | 136 +++++++++++----------
1 file changed, 74 insertions(+), 62 deletions(-)
diff --git
a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
index 334e93e..217d9e6 100644
---
a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
+++
b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
@@ -28,7 +28,6 @@ import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientBuilder;
-import com.github.dockerjava.core.command.LogContainerResultCallback;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.airavata.datalake.data.orchestrator.api.stub.parsing.*;
@@ -36,7 +35,6 @@ import
org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTa
import
org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
import
org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
import
org.apache.airavata.datalake.orchestrator.workflow.engine.task.types.StringMap;
-import org.apache.commons.io.IOUtils;
import org.apache.helix.task.TaskResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +43,9 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.stream.Collectors;
@BlockingTaskDef(name = "GenericDataParsingTask")
@@ -86,7 +82,7 @@ public class GenericDataParsingTask extends BlockingTask {
} finally {
- if (channel!= null) {
+ if (channel != null) {
channel.shutdown();
}
}
@@ -107,7 +103,7 @@ public class GenericDataParsingTask extends BlockingTask {
return new TaskResult(TaskResult.Status.FAILED, "Failed to create
temp working directories");
}
- for (DataParserInputInterface dpi: inputInterfaces) {
+ for (DataParserInputInterface dpi : inputInterfaces) {
String path =
getInputMapping().get(dpi.getParserInputInterfaceId());
if (path == null) {
logger.error("No value specified for input {}",
dpi.getParserInputInterfaceId());
@@ -143,7 +139,7 @@ public class GenericDataParsingTask extends BlockingTask {
}
private void exportOutputs(DataParser parser, String outputPath) {
- for (DataParserOutputInterface dpoi: parser.getOutputInterfacesList())
{
+ for (DataParserOutputInterface dpoi :
parser.getOutputInterfacesList()) {
putUserContent(getTaskId() + "-" + dpoi.getOutputName(),
outputPath + File.separator + dpoi.getOutputName(),
Scope.WORKFLOW);
@@ -151,11 +147,12 @@ public class GenericDataParsingTask extends BlockingTask {
}
private void runContainer(DataParser parser, String inputPath, String
outputPath, Map<String, String> environmentValues)
- throws Exception{
+ throws Exception {
DefaultDockerClientConfig.Builder config =
DefaultDockerClientConfig.createDefaultConfigBuilder();
DockerClient dockerClient =
DockerClientBuilder.getInstance(config.build()).build();
+
logger.info("Pulling image " + parser.getDockerImage());
try {
dockerClient.pullImageCmd(parser.getDockerImage().split(":")[0])
@@ -182,67 +179,82 @@ public class GenericDataParsingTask extends BlockingTask {
.exec();
logger.info("Created the container with id " +
containerResponse.getId());
+ try {
- final StringBuilder dockerLogs = new StringBuilder();
+ final StringBuilder dockerLogs = new StringBuilder();
- if (containerResponse.getWarnings() != null &&
containerResponse.getWarnings().length > 0) {
- StringBuilder warningStr = new StringBuilder();
- for (String w : containerResponse.getWarnings()) {
- warningStr.append(w).append(",");
- }
- logger.warn("Container " + containerResponse.getId() + " warnings
: " + warningStr);
- } else {
- logger.info("Starting container with id " +
containerResponse.getId());
- dockerClient.startContainerCmd(containerResponse.getId()).exec();
- LogContainerCmd logContainerCmd =
dockerClient.logContainerCmd(containerResponse.getId()).withStdOut(true).withStdErr(true);
+ if (containerResponse.getWarnings() != null &&
containerResponse.getWarnings().length > 0) {
+ StringBuilder warningStr = new StringBuilder();
+ for (String w : containerResponse.getWarnings()) {
+ warningStr.append(w).append(",");
+ }
+ logger.warn("Container " + containerResponse.getId() + "
warnings : " + warningStr);
+ } else {
+ logger.info("Starting container with id " +
containerResponse.getId());
+
dockerClient.startContainerCmd(containerResponse.getId()).exec();
+ LogContainerCmd logContainerCmd =
dockerClient.logContainerCmd(containerResponse.getId()).withStdOut(true).withStdErr(true);
+
+ try {
+
+ logContainerCmd.exec(new ResultCallback.Adapter<Frame>() {
+ @Override
+ public void onNext(Frame item) {
+ logger.info("Got frame: {}", item);
+ ;
+ if (item.getStreamType() == StreamType.STDOUT) {
+ dockerLogs.append(new
String(item.getPayload(), StandardCharsets.UTF_8));
+ dockerLogs.append("\n");
+ } else if (item.getStreamType() ==
StreamType.STDERR) {
+ dockerLogs.append(new
String(item.getPayload(), StandardCharsets.UTF_8));
+ dockerLogs.append("\n");
+ }
+ super.onNext(item);
+ }
- try {
+ @Override
+ public void onError(Throwable throwable) {
+ logger.error("Errored while running the container
{}", containerId, throwable);
+ super.onError(throwable);
+ }
- logContainerCmd.exec(new ResultCallback.Adapter<Frame>() {
- @Override
- public void onNext(Frame item) {
- logger.info("Got frame: {}", item);;
- if (item.getStreamType() == StreamType.STDOUT) {
- dockerLogs.append(new String(item.getPayload(),
StandardCharsets.UTF_8));
- dockerLogs.append("\n");
- } else if (item.getStreamType() == StreamType.STDERR) {
- dockerLogs.append(new String(item.getPayload(),
StandardCharsets.UTF_8));
- dockerLogs.append("\n");
+ @Override
+ public void onComplete() {
+ logger.info("Container {} successfully completed",
containerId);
+ super.onComplete();
}
- super.onNext(item);
- }
-
- @Override
- public void onError(Throwable throwable) {
- logger.error("Errored while running the container {}",
containerId, throwable);
- super.onError(throwable);
- }
-
- @Override
- public void onComplete() {
- logger.info("Container {} successfully completed",
containerId);
- super.onComplete();
- }
- }).awaitCompletion();
- } catch (InterruptedException e) {
- logger.error("Interrupted while reading container log" +
e.getMessage());
- throw e;
- }
+ }).awaitCompletion();
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while reading container log" +
e.getMessage());
+ throw e;
+ }
- logger.info("Waiting for the container to stop");
+ logger.info("Waiting for the container to stop");
- Integer statusCode =
dockerClient.waitContainerCmd(containerResponse.getId()).exec(new
WaitContainerResultCallback()).awaitStatusCode();
- logger.info("Container " + containerResponse.getId() + " exited
with status code " + statusCode);
- if (statusCode != 0) {
- logger.error("Failing as non zero status code was returned");
- throw new Exception("Failing as non zero status code was
returned");
- }
+ Integer statusCode =
dockerClient.waitContainerCmd(containerResponse.getId()).exec(new
WaitContainerResultCallback()).awaitStatusCode();
+ logger.info("Container " + containerResponse.getId() + "
exited with status code " + statusCode);
+ if (statusCode != 0) {
+ logger.error("Failing as non zero status code was
returned");
+ throw new Exception("Failing as non zero status code was
returned");
+ }
- logger.info("Container logs " + dockerLogs.toString());
+ logger.info("Container logs " + dockerLogs.toString());
+ }
+ } finally {
+ dockerClient.removeContainerCmd(containerResponse.getId()).exec();
+ logger.info("Successfully removed container with id " +
containerResponse.getId());
+ Path dir = Paths.get(workingDirectory.get());
+ Files
+ .walk(dir) // Traverse the file tree in depth-first order
+ .sorted(Comparator.reverseOrder())
+ .forEach(path -> {
+ try {
+ logger.info("Deleting resources : " + path);
+ Files.delete(path); //delete each file or
directory
+ } catch (IOException e) {
+ logger.error("File cleanup failed for path " +
dir, e);
+ }
+ });
}
-
- dockerClient.removeContainerCmd(containerResponse.getId()).exec();
- logger.info("Successfully removed container with id " +
containerResponse.getId());
}
public String getWorkingDirectory() {