This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new bac152f  delete docker container if error occurred
     new 00d11bb  Merge pull request #112 from isururanawaka/master
bac152f is described below

commit bac152f273c39342a6b26f0ac8fc2a59abdce0d6
Author: Isuru Ranawaka <[email protected]>
AuthorDate: Tue Jun 7 11:07:50 2022 -0400

    delete docker container if error occurred
---
 .../engine/task/impl/GenericDataParsingTask.java   | 136 +++++++++++----------
 1 file changed, 74 insertions(+), 62 deletions(-)

diff --git 
a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
 
b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
index 334e93e..217d9e6 100644
--- 
a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
+++ 
b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java
@@ -28,7 +28,6 @@ import com.github.dockerjava.api.model.Frame;
 import com.github.dockerjava.api.model.StreamType;
 import com.github.dockerjava.core.DefaultDockerClientConfig;
 import com.github.dockerjava.core.DockerClientBuilder;
-import com.github.dockerjava.core.command.LogContainerResultCallback;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import org.apache.airavata.datalake.data.orchestrator.api.stub.parsing.*;
@@ -36,7 +35,6 @@ import 
org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTa
 import 
org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef;
 import 
org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam;
 import 
org.apache.airavata.datalake.orchestrator.workflow.engine.task.types.StringMap;
-import org.apache.commons.io.IOUtils;
 import org.apache.helix.task.TaskResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,11 +43,9 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.stream.Collectors;
 
 @BlockingTaskDef(name = "GenericDataParsingTask")
@@ -86,7 +82,7 @@ public class GenericDataParsingTask extends BlockingTask {
 
         } finally {
 
-            if (channel!= null) {
+            if (channel != null) {
                 channel.shutdown();
             }
         }
@@ -107,7 +103,7 @@ public class GenericDataParsingTask extends BlockingTask {
             return new TaskResult(TaskResult.Status.FAILED, "Failed to create 
temp working directories");
         }
 
-        for (DataParserInputInterface dpi: inputInterfaces) {
+        for (DataParserInputInterface dpi : inputInterfaces) {
             String path = 
getInputMapping().get(dpi.getParserInputInterfaceId());
             if (path == null) {
                 logger.error("No value specified for input {}", 
dpi.getParserInputInterfaceId());
@@ -143,7 +139,7 @@ public class GenericDataParsingTask extends BlockingTask {
     }
 
     private void exportOutputs(DataParser parser, String outputPath) {
-        for (DataParserOutputInterface dpoi: parser.getOutputInterfacesList()) 
{
+        for (DataParserOutputInterface dpoi : 
parser.getOutputInterfacesList()) {
             putUserContent(getTaskId() + "-" + dpoi.getOutputName(),
                     outputPath + File.separator + dpoi.getOutputName(),
                     Scope.WORKFLOW);
@@ -151,11 +147,12 @@ public class GenericDataParsingTask extends BlockingTask {
     }
 
     private void runContainer(DataParser parser, String inputPath, String 
outputPath, Map<String, String> environmentValues)
-            throws Exception{
+            throws Exception {
 
         DefaultDockerClientConfig.Builder config = 
DefaultDockerClientConfig.createDefaultConfigBuilder();
         DockerClient dockerClient = 
DockerClientBuilder.getInstance(config.build()).build();
 
+
         logger.info("Pulling image " + parser.getDockerImage());
         try {
             dockerClient.pullImageCmd(parser.getDockerImage().split(":")[0])
@@ -182,67 +179,82 @@ public class GenericDataParsingTask extends BlockingTask {
                 .exec();
 
         logger.info("Created the container with id " + 
containerResponse.getId());
+        try {
 
-        final StringBuilder dockerLogs = new StringBuilder();
+            final StringBuilder dockerLogs = new StringBuilder();
 
-        if (containerResponse.getWarnings() != null && 
containerResponse.getWarnings().length > 0) {
-            StringBuilder warningStr = new StringBuilder();
-            for (String w : containerResponse.getWarnings()) {
-                warningStr.append(w).append(",");
-            }
-            logger.warn("Container " + containerResponse.getId() + " warnings 
: " + warningStr);
-        } else {
-            logger.info("Starting container with id " + 
containerResponse.getId());
-            dockerClient.startContainerCmd(containerResponse.getId()).exec();
-            LogContainerCmd logContainerCmd = 
dockerClient.logContainerCmd(containerResponse.getId()).withStdOut(true).withStdErr(true);
+            if (containerResponse.getWarnings() != null && 
containerResponse.getWarnings().length > 0) {
+                StringBuilder warningStr = new StringBuilder();
+                for (String w : containerResponse.getWarnings()) {
+                    warningStr.append(w).append(",");
+                }
+                logger.warn("Container " + containerResponse.getId() + " 
warnings : " + warningStr);
+            } else {
+                logger.info("Starting container with id " + 
containerResponse.getId());
+                
dockerClient.startContainerCmd(containerResponse.getId()).exec();
+                LogContainerCmd logContainerCmd = 
dockerClient.logContainerCmd(containerResponse.getId()).withStdOut(true).withStdErr(true);
+
+                try {
+
+                    logContainerCmd.exec(new ResultCallback.Adapter<Frame>() {
+                        @Override
+                        public void onNext(Frame item) {
+                            logger.info("Got frame: {}", item);
+                            ;
+                            if (item.getStreamType() == StreamType.STDOUT) {
+                                dockerLogs.append(new 
String(item.getPayload(), StandardCharsets.UTF_8));
+                                dockerLogs.append("\n");
+                            } else if (item.getStreamType() == 
StreamType.STDERR) {
+                                dockerLogs.append(new 
String(item.getPayload(), StandardCharsets.UTF_8));
+                                dockerLogs.append("\n");
+                            }
+                            super.onNext(item);
+                        }
 
-            try {
+                        @Override
+                        public void onError(Throwable throwable) {
+                            logger.error("Errored while running the container 
{}", containerId, throwable);
+                            super.onError(throwable);
+                        }
 
-                logContainerCmd.exec(new ResultCallback.Adapter<Frame>() {
-                    @Override
-                    public void onNext(Frame item) {
-                        logger.info("Got frame: {}", item);;
-                        if (item.getStreamType() == StreamType.STDOUT) {
-                            dockerLogs.append(new String(item.getPayload(), 
StandardCharsets.UTF_8));
-                            dockerLogs.append("\n");
-                        } else if (item.getStreamType() == StreamType.STDERR) {
-                            dockerLogs.append(new String(item.getPayload(), 
StandardCharsets.UTF_8));
-                            dockerLogs.append("\n");
+                        @Override
+                        public void onComplete() {
+                            logger.info("Container {} successfully completed", 
containerId);
+                            super.onComplete();
                         }
-                        super.onNext(item);
-                    }
-
-                    @Override
-                    public void onError(Throwable throwable) {
-                        logger.error("Errored while running the container {}", 
containerId, throwable);
-                        super.onError(throwable);
-                    }
-
-                    @Override
-                    public void onComplete() {
-                        logger.info("Container {} successfully completed", 
containerId);
-                        super.onComplete();
-                    }
-                }).awaitCompletion();
-            } catch (InterruptedException e) {
-                logger.error("Interrupted while reading container log" + 
e.getMessage());
-                throw e;
-            }
+                    }).awaitCompletion();
+                } catch (InterruptedException e) {
+                    logger.error("Interrupted while reading container log" + 
e.getMessage());
+                    throw e;
+                }
 
-            logger.info("Waiting for the container to stop");
+                logger.info("Waiting for the container to stop");
 
-            Integer statusCode = 
dockerClient.waitContainerCmd(containerResponse.getId()).exec(new 
WaitContainerResultCallback()).awaitStatusCode();
-            logger.info("Container " + containerResponse.getId() + " exited 
with status code " + statusCode);
-            if (statusCode != 0) {
-                logger.error("Failing as non zero status code was returned");
-                throw new Exception("Failing as non zero status code was 
returned");
-            }
+                Integer statusCode = 
dockerClient.waitContainerCmd(containerResponse.getId()).exec(new 
WaitContainerResultCallback()).awaitStatusCode();
+                logger.info("Container " + containerResponse.getId() + " 
exited with status code " + statusCode);
+                if (statusCode != 0) {
+                    logger.error("Failing as non zero status code was 
returned");
+                    throw new Exception("Failing as non zero status code was 
returned");
+                }
 
-            logger.info("Container logs " + dockerLogs.toString());
+                logger.info("Container logs " + dockerLogs.toString());
+            }
+        } finally {
+            dockerClient.removeContainerCmd(containerResponse.getId()).exec();
+            logger.info("Successfully removed container with id " + 
containerResponse.getId());
+            Path dir = Paths.get(workingDirectory.get());
+            Files
+                    .walk(dir) // Traverse the file tree in depth-first order
+                    .sorted(Comparator.reverseOrder())
+                    .forEach(path -> {
+                        try {
+                            logger.info("Deleting resources : " + path);
+                            Files.delete(path);  //delete each file or 
directory
+                        } catch (IOException e) {
+                            logger.error("File cleanup failed for path " + 
dir, e);
+                        }
+                    });
         }
-
-        dockerClient.removeContainerCmd(containerResponse.getId()).exec();
-        logger.info("Successfully removed container with id " + 
containerResponse.getId());
     }
 
     public String getWorkingDirectory() {

Reply via email to