This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc44a6dee8338c4e4af1e3cecf4512c8bf11ae67
Author: Qingsheng Ren <[email protected]>
AuthorDate: Thu Dec 16 16:02:38 2021 +0800

    [FLINK-25340][testutils] Remove JM temp files before stopping container to 
avoid file permission issues
    
    Co-authored-by: Fabian Paul <[email protected]>
---
 .../util/flink/container/FlinkContainers.java      | 53 ++++++++++++++++++++--
 .../flink/container/FlinkContainersBuilder.java    | 38 ++--------------
 2 files changed, 53 insertions(+), 38 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
index 693ab0d..5d4fa8e 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
@@ -20,7 +20,9 @@ package org.apache.flink.tests.util.flink.container;
 
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
@@ -30,6 +32,8 @@ import 
org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.tests.util.flink.SQLJobSubmission;
 import org.apache.flink.util.function.RunnableWithException;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.Extension;
@@ -49,8 +53,11 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -113,7 +120,6 @@ public class FlinkContainers implements BeforeAllCallback, 
AfterAllCallback {
     private final List<GenericContainer<?>> taskManagers;
     private final GenericContainer<?> haService;
     private final Configuration conf;
-    private final Runnable cleanupHook;
 
     @Nullable private RestClusterClient<StandaloneClusterId> restClusterClient;
     private boolean isStarted = false;
@@ -127,13 +133,11 @@ public class FlinkContainers implements 
BeforeAllCallback, AfterAllCallback {
             GenericContainer<?> jobManager,
             List<GenericContainer<?>> taskManagers,
             @Nullable GenericContainer<?> haService,
-            Configuration conf,
-            Runnable cleanupHook) {
+            Configuration conf) {
         this.jobManager = jobManager;
         this.taskManagers = taskManagers;
         this.haService = haService;
         this.conf = conf;
-        this.cleanupHook = cleanupHook;
     }
 
     /** Starts all containers. */
@@ -160,11 +164,11 @@ public class FlinkContainers implements 
BeforeAllCallback, AfterAllCallback {
             restClusterClient.close();
         }
         this.taskManagers.forEach(GenericContainer::stop);
+        deleteJobManagerTemporaryFiles();
         this.jobManager.stop();
         if (this.haService != null) {
             this.haService.stop();
         }
-        cleanupHook.run();
     }
 
     /** Gets the running state of the cluster. */
@@ -317,4 +321,43 @@ public class FlinkContainers implements BeforeAllCallback, 
AfterAllCallback {
                 DEFAULT_TIMEOUT,
                 "TaskManagers are not ready within 30 seconds");
     }
+
+    private void deleteJobManagerTemporaryFiles() {
+        final String checkpointDir = 
conf.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+        final String haDir = conf.get(HighAvailabilityOptions.HA_STORAGE_PATH);
+        final Collection<String> usedPaths =
+                Lists.newArrayList(checkpointDir, haDir).stream()
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList());
+        if (usedPaths.isEmpty()) {
+            return;
+        }
+        final StringBuilder deletionBaseCommand = new StringBuilder("rm -rf");
+        usedPaths.forEach(p -> 
deletionBaseCommand.append(formatFilePathForDeletion(p)));
+        final String[] command = {"bash", "-c", 
deletionBaseCommand.toString()};
+        final Container.ExecResult result;
+        try {
+            result = jobManager.execInContainer(command);
+            if (result.getExitCode() != 0) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Command \"%s\" returned non-zero exit code 
%d. \nSTDOUT: %s\nSTDERR: %s",
+                                String.join(" ", command),
+                                result.getExitCode(),
+                                result.getStdout(),
+                                result.getStderr()));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to delete temporary files generated by the flink 
cluster.", e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(
+                    "Failed to delete temporary files generated by the flink 
cluster.", e);
+        }
+    }
+
+    private String formatFilePathForDeletion(String path) {
+        return " " + Paths.get(path).toString().split("file:")[1] + "/*";
+    }
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java
index 2179860..bb9f190 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
@@ -165,13 +164,10 @@ public class FlinkContainersBuilder {
                 CheckpointingOptions.CHECKPOINTS_DIRECTORY,
                 CHECKPOINT_PATH.toAbsolutePath().toUri().toString());
 
-        final List<Path> temporaryPaths = new ArrayList<>();
-
         // Create temporary directory for building Flink image
         final Path imageBuildingTempDir;
         try {
             imageBuildingTempDir = 
Files.createTempDirectory("flink-image-build");
-            temporaryPaths.add(imageBuildingTempDir);
         } catch (IOException e) {
             throw new RuntimeException("Failed to create temporary directory", 
e);
         }
@@ -185,22 +181,13 @@ public class FlinkContainersBuilder {
 
         // Mount HA storage to JobManager
         if (enableZookeeperHA) {
-            final Path haStorage =
-                    createTempDirAndMountToContainer("flink-recovery", 
HA_STORAGE_PATH, jobManager);
-            temporaryPaths.add(haStorage);
+            createTempDirAndMountToContainer("flink-recovery", 
HA_STORAGE_PATH, jobManager);
         }
 
         // Mount checkpoint storage to JobManager
-        final Path checkpointPath =
-                createTempDirAndMountToContainer("flink-checkpoint", 
CHECKPOINT_PATH, jobManager);
-        temporaryPaths.add(checkpointPath);
-
-        return new FlinkContainers(
-                jobManager,
-                taskManagers,
-                zookeeper,
-                conf,
-                () -> deleteTemporaryPaths(temporaryPaths));
+        createTempDirAndMountToContainer("flink-checkpoint", CHECKPOINT_PATH, 
jobManager);
+
+        return new FlinkContainers(jobManager, taskManagers, zookeeper, conf);
     }
 
     // --------------------------- Helper Functions 
-------------------------------------
@@ -292,30 +279,15 @@ public class FlinkContainersBuilder {
         conf.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
HA_STORAGE_PATH.toUri().toString());
     }
 
-    private Path createTempDirAndMountToContainer(
+    private void createTempDirAndMountToContainer(
             String tempDirPrefix, Path containerPath, GenericContainer<?> 
container) {
         try {
             Path tempDirPath = Files.createTempDirectory(tempDirPrefix);
             container.withFileSystemBind(
                     tempDirPath.toAbsolutePath().toString(),
                     containerPath.toAbsolutePath().toString());
-            return tempDirPath;
         } catch (IOException e) {
             throw new IllegalStateException("Failed to create temporary 
recovery directory", e);
         }
     }
-
-    private void deleteTemporaryPaths(List<Path> temporaryPaths) {
-        temporaryPaths.forEach(
-                (path) -> {
-                    try {
-                        FileUtils.deleteDirectory(path.toFile());
-                    } catch (IOException e) {
-                        throw new RuntimeException(
-                                String.format(
-                                        "Failed to delete path \"%s\"", 
path.toAbsolutePath()),
-                                e);
-                    }
-                });
-    }
 }

Reply via email to