This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2859196f9ab1d86a3d90e47a89cbd13be74741b9 Author: Weijie Guo <[email protected]> AuthorDate: Thu Oct 20 16:34:27 2022 +0800 [FLINK-28102] Flink AkkaRpcSystemLoader fails when temporary directory is a symlink --- .../main/java/org/apache/flink/util/FileUtils.java | 23 +++++++++++ .../java/org/apache/flink/util/FileUtilsTest.java | 44 ++++++++++++++++++++++ .../runtime/rpc/akka/AkkaRpcSystemLoader.java | 3 +- .../rpc/akka/AkkaRpcSystemLoaderITCase.java | 32 ++++++++++++++++ 4 files changed, 101 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java index e517dc8eb56..c336e7d49c5 100644 --- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java @@ -648,6 +648,29 @@ public final class FileUtils { return fileName; } + /** + * Get a target path(the path that replaced symbolic links with linked path) if the original + * path contains symbolic path, return the original path otherwise. + * + * @param path the original path. + * @return the path that replaced symbolic links with real path. + */ + public static java.nio.file.Path getTargetPathIfContainsSymbolicPath(java.nio.file.Path path) + throws IOException { + java.nio.file.Path targetPath = path; + java.nio.file.Path suffixPath = Paths.get(""); + while (path != null && path.getFileName() != null) { + if (Files.isSymbolicLink(path)) { + java.nio.file.Path linkedPath = path.toRealPath(); + targetPath = Paths.get(linkedPath.toString(), suffixPath.toString()); + break; + } + suffixPath = Paths.get(path.getFileName().toString(), suffixPath.toString()); + path = path.getParent(); + } + return targetPath; + } + /** * Converts the given {@link java.nio.file.Path} into a file {@link URL}. The resulting url is * relative iff the given path is relative. diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java index b800ef6541c..b24179b6f67 100644 --- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java @@ -315,6 +315,50 @@ public class FileUtilsTest { assertThat(paths).containsExactlyInAnyOrder(linkPath.resolve(file.getName())); } + @Test + void testGetTargetPathNotContainsSymbolicPath() throws IOException { + java.nio.file.Path testPath = Paths.get("parent", "child"); + java.nio.file.Path targetPath = FileUtils.getTargetPathIfContainsSymbolicPath(testPath); + assertThat(targetPath).isEqualTo(testPath); + } + + @Test + void testGetTargetPathContainsSymbolicPath() throws IOException { + File linkedDir = TempDirUtils.newFolder(temporaryFolder, "linked"); + java.nio.file.Path symlink = Paths.get(temporaryFolder.toString(), "symlink"); + java.nio.file.Path dirInLinked = + TempDirUtils.newFolder(linkedDir.toPath(), "one", "two").toPath().toRealPath(); + Files.createSymbolicLink(symlink, linkedDir.toPath()); + + java.nio.file.Path targetPath = + FileUtils.getTargetPathIfContainsSymbolicPath( + symlink.resolve("one").resolve("two")); + assertThat(targetPath).isEqualTo(dirInLinked); + } + + @Test + void testGetTargetPathContainsMultipleSymbolicPath() throws IOException { + File linked1Dir = TempDirUtils.newFolder(temporaryFolder, "linked1"); + java.nio.file.Path symlink1 = Paths.get(temporaryFolder.toString(), "symlink1"); + Files.createSymbolicLink(symlink1, linked1Dir.toPath()); + + java.nio.file.Path symlink2 = Paths.get(symlink1.toString(), "symlink2"); + File linked2Dir = TempDirUtils.newFolder(temporaryFolder, "linked2"); + Files.createSymbolicLink(symlink2, linked2Dir.toPath()); + java.nio.file.Path dirInLinked2 = + TempDirUtils.newFolder(linked2Dir.toPath(), "one").toPath().toRealPath(); + + // symlink3 point to another symbolic link: symlink2 + java.nio.file.Path symlink3 = Paths.get(symlink1.toString(), "symlink3"); + Files.createSymbolicLink(symlink3, symlink2); + + java.nio.file.Path targetPath = + FileUtils.getTargetPathIfContainsSymbolicPath( + // path contains multiple symlink : xxx/symlink1/symlink3/one + symlink3.resolve("one")); + assertThat(targetPath).isEqualTo(dirInLinked2); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java index 735f739aaac..afe2c5bd0fc 100644 --- a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java +++ b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java @@ -23,6 +23,7 @@ import org.apache.flink.core.classloading.SubmoduleClassLoader; import org.apache.flink.runtime.rpc.RpcSystem; import org.apache.flink.runtime.rpc.RpcSystemLoader; import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import java.io.IOException; @@ -55,7 +56,7 @@ public class AkkaRpcSystemLoader implements RpcSystemLoader { final ClassLoader flinkClassLoader = RpcSystem.class.getClassLoader(); final Path tmpDirectory = Paths.get(ConfigurationUtils.parseTempDirectories(config)[0]); - Files.createDirectories(tmpDirectory); + Files.createDirectories(FileUtils.getTargetPathIfContainsSymbolicPath(tmpDirectory)); final Path tempFile = Files.createFile( tmpDirectory.resolve("flink-rpc-akka_" + UUID.randomUUID() + ".jar")); diff --git a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoaderITCase.java b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoaderITCase.java index 7b6d1452c10..2ba3c621c2b 100644 --- a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoaderITCase.java +++ b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoaderITCase.java @@ -24,10 +24,13 @@ import org.apache.flink.runtime.rpc.RpcSystem; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Tests for the {@link AkkaRpcSystemLoader}. @@ -56,4 +59,33 @@ class AkkaRpcSystemLoaderITCase { assertThat(rpcSystem).isNotNull(); } } + + @Test + void testServiceLoadingWithExistingLinkedPath(@TempDir Path tempDir) throws Exception { + final Configuration config = new Configuration(); + + Path linkedDirectory = Paths.get(tempDir.toString(), "linkedDir"); + Path symbolicLink = Paths.get(tempDir.toString(), "symlink"); + Files.createSymbolicLink(symbolicLink, linkedDirectory); + Files.createDirectories(linkedDirectory.resolve("a").resolve("b")); + // set the tmp dirs to dirs in symbolic link path. + config.set(CoreOptions.TMP_DIRS, symbolicLink.resolve("a").resolve("b").toString()); + try (final RpcSystem rpcSystem = LOADER.loadRpcSystem(config)) { + assertThat(rpcSystem).isNotNull(); + } + } + + @Test + void testServiceLoadingWithNonExistingLinkedPath(@TempDir Path tempDir) throws Exception { + final Configuration config = new Configuration(); + + Path linkedDirectory = Paths.get(tempDir.toString(), "linkedDir"); + Path symbolicLink = Paths.get(tempDir.toString(), "symlink"); + Files.createSymbolicLink(symbolicLink, linkedDirectory); + // set the tmp dirs to dirs in symbolic link path. + config.set(CoreOptions.TMP_DIRS, symbolicLink.toString()); + // if this is a symlink that linked dir not exist, throw exception directly. + assertThatThrownBy(() -> LOADER.loadRpcSystem(config)) + .hasRootCauseInstanceOf(NoSuchFileException.class); + } }
