This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc44a6dee8338c4e4af1e3cecf4512c8bf11ae67 Author: Qingsheng Ren <[email protected]> AuthorDate: Thu Dec 16 16:02:38 2021 +0800 [FLINK-25340][testutils] Remove JM temp files before stopping container to avoid file permission issues Co-authored-by: Fabian Paul <[email protected]> --- .../util/flink/container/FlinkContainers.java | 53 ++++++++++++++++++++-- .../flink/container/FlinkContainersBuilder.java | 38 ++-------------- 2 files changed, 53 insertions(+), 38 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java index 693ab0d..5d4fa8e 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java @@ -20,7 +20,9 @@ package org.apache.flink.tests.util.flink.container; import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; @@ -30,6 +32,8 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.tests.util.flink.SQLJobSubmission; import org.apache.flink.util.function.RunnableWithException; +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.Extension; @@ -49,8 +53,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -113,7 +120,6 @@ public class FlinkContainers implements BeforeAllCallback, AfterAllCallback { private final List<GenericContainer<?>> taskManagers; private final GenericContainer<?> haService; private final Configuration conf; - private final Runnable cleanupHook; @Nullable private RestClusterClient<StandaloneClusterId> restClusterClient; private boolean isStarted = false; @@ -127,13 +133,11 @@ public class FlinkContainers implements BeforeAllCallback, AfterAllCallback { GenericContainer<?> jobManager, List<GenericContainer<?>> taskManagers, @Nullable GenericContainer<?> haService, - Configuration conf, - Runnable cleanupHook) { + Configuration conf) { this.jobManager = jobManager; this.taskManagers = taskManagers; this.haService = haService; this.conf = conf; - this.cleanupHook = cleanupHook; } /** Starts all containers. */ @@ -160,11 +164,11 @@ public class FlinkContainers implements BeforeAllCallback, AfterAllCallback { restClusterClient.close(); } this.taskManagers.forEach(GenericContainer::stop); + deleteJobManagerTemporaryFiles(); this.jobManager.stop(); if (this.haService != null) { this.haService.stop(); } - cleanupHook.run(); } /** Gets the running state of the cluster. */ @@ -317,4 +321,43 @@ public class FlinkContainers implements BeforeAllCallback, AfterAllCallback { DEFAULT_TIMEOUT, "TaskManagers are not ready within 30 seconds"); } + + private void deleteJobManagerTemporaryFiles() { + final String checkpointDir = conf.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY); + final String haDir = conf.get(HighAvailabilityOptions.HA_STORAGE_PATH); + final Collection<String> usedPaths = + Lists.newArrayList(checkpointDir, haDir).stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (usedPaths.isEmpty()) { + return; + } + final StringBuilder deletionBaseCommand = new StringBuilder("rm -rf"); + usedPaths.forEach(p -> deletionBaseCommand.append(formatFilePathForDeletion(p))); + final String[] command = {"bash", "-c", deletionBaseCommand.toString()}; + final Container.ExecResult result; + try { + result = jobManager.execInContainer(command); + if (result.getExitCode() != 0) { + throw new IllegalStateException( + String.format( + "Command \"%s\" returned non-zero exit code %d. \nSTDOUT: %s\nSTDERR: %s", + String.join(" ", command), + result.getExitCode(), + result.getStdout(), + result.getStderr())); + } + } catch (IOException e) { + throw new RuntimeException( + "Failed to delete temporary files generated by the flink cluster.", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Failed to delete temporary files generated by the flink cluster.", e); + } + } + + private String formatFilePathForDeletion(String path) { + return " " + Paths.get(path).toString().split("file:")[1] + "/*"; + } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java index 2179860..bb9f190 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java @@ -26,7 +26,6 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; @@ -165,13 +164,10 @@ public class FlinkContainersBuilder { CheckpointingOptions.CHECKPOINTS_DIRECTORY, CHECKPOINT_PATH.toAbsolutePath().toUri().toString()); - final List<Path> temporaryPaths = new ArrayList<>(); - // Create temporary directory for building Flink image final Path imageBuildingTempDir; try { imageBuildingTempDir = Files.createTempDirectory("flink-image-build"); - temporaryPaths.add(imageBuildingTempDir); } catch (IOException e) { throw new RuntimeException("Failed to create temporary directory", e); } @@ -185,22 +181,13 @@ public class FlinkContainersBuilder { // Mount HA storage to JobManager if (enableZookeeperHA) { - final Path haStorage = - createTempDirAndMountToContainer("flink-recovery", HA_STORAGE_PATH, jobManager); - temporaryPaths.add(haStorage); + createTempDirAndMountToContainer("flink-recovery", HA_STORAGE_PATH, jobManager); } // Mount checkpoint storage to JobManager - final Path checkpointPath = - createTempDirAndMountToContainer("flink-checkpoint", CHECKPOINT_PATH, jobManager); - temporaryPaths.add(checkpointPath); - - return new FlinkContainers( - jobManager, - taskManagers, - zookeeper, - conf, - () -> deleteTemporaryPaths(temporaryPaths)); + createTempDirAndMountToContainer("flink-checkpoint", CHECKPOINT_PATH, jobManager); + + return new FlinkContainers(jobManager, taskManagers, zookeeper, conf); } // --------------------------- Helper Functions ------------------------------------- @@ -292,30 +279,15 @@ public class FlinkContainersBuilder { conf.set(HighAvailabilityOptions.HA_STORAGE_PATH, HA_STORAGE_PATH.toUri().toString()); } - private Path createTempDirAndMountToContainer( + private void createTempDirAndMountToContainer( String tempDirPrefix, Path containerPath, GenericContainer<?> container) { try { Path tempDirPath = Files.createTempDirectory(tempDirPrefix); container.withFileSystemBind( tempDirPath.toAbsolutePath().toString(), containerPath.toAbsolutePath().toString()); - return tempDirPath; } catch (IOException e) { throw new IllegalStateException("Failed to create temporary recovery directory", e); } } - - private void deleteTemporaryPaths(List<Path> temporaryPaths) { - temporaryPaths.forEach( - (path) -> { - try { - FileUtils.deleteDirectory(path.toFile()); - } catch (IOException e) { - throw new RuntimeException( - String.format( - "Failed to delete path \"%s\"", path.toAbsolutePath()), - e); - } - }); - } }
