fapaul commented on a change in pull request #18128:
URL: https://github.com/apache/flink/pull/18128#discussion_r770323139



##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
##########
@@ -160,6 +162,7 @@ public void stop() {
             restClusterClient.close();
         }
         this.taskManagers.forEach(GenericContainer::stop);
+        deleteJobManagerTemporaryFiles();

Review comment:
       Do we still need the cleanup hook passed to this class? AFAICT the 
jobmanager deletes the HA and the checkpoint data so only the image has to be 
deleted. Can we delete the image immediately after we have built the containers 
in the builder already?

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
##########
@@ -317,4 +320,25 @@ private void waitUntilAllTaskManagerConnected() throws 
InterruptedException, Tim
                 DEFAULT_TIMEOUT,
                 "TaskManagers are not ready within 30 seconds");
     }
+
+    private void deleteJobManagerTemporaryFiles() {
+        try {
+            final Container.ExecResult result =
+                    jobManager.execInContainer(
+                            "rm",
+                            "-rf",
+                            
Paths.get(conf.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY), "*")
+                                    .toString(),
+                            
Paths.get(conf.get(HighAvailabilityOptions.HA_STORAGE_PATH), "*")
+                                    .toString());
+            if (result.getExitCode() != 0) {
+                throw new IllegalStateException(
+                        String.format(
+                                "rm command returned non-zero exit code %d. 
\nSTDOUT: %s\nSTDERR: %s",
+                                result.getExitCode(), result.getStdout(), 
result.getStderr()));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to delete temporary files in 
JobManager");

Review comment:
       We should not swallow the causing and exception and pass it to the 
RuntimeException

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
##########
@@ -317,4 +320,25 @@ private void waitUntilAllTaskManagerConnected() throws 
InterruptedException, Tim
                 DEFAULT_TIMEOUT,
                 "TaskManagers are not ready within 30 seconds");
     }
+
+    private void deleteJobManagerTemporaryFiles() {
+        try {
+            final Container.ExecResult result =
+                    jobManager.execInContainer(
+                            "rm",
+                            "-rf",
+                            
Paths.get(conf.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY), "*")
+                                    .toString(),
+                            
Paths.get(conf.get(HighAvailabilityOptions.HA_STORAGE_PATH), "*")
+                                    .toString());
+            if (result.getExitCode() != 0) {
+                throw new IllegalStateException(
+                        String.format(
+                                "rm command returned non-zero exit code %d. 
\nSTDOUT: %s\nSTDERR: %s",
+                                result.getExitCode(), result.getStdout(), 
result.getStderr()));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to delete temporary files in 
JobManager");

Review comment:
       Nit: Maybe add the directory names to the exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to