This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2859196f9ab1d86a3d90e47a89cbd13be74741b9
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Oct 20 16:34:27 2022 +0800

    [FLINK-28102] Flink AkkaRpcSystemLoader fails when temporary directory is a 
symlink
---
 .../main/java/org/apache/flink/util/FileUtils.java | 23 +++++++++++
 .../java/org/apache/flink/util/FileUtilsTest.java  | 44 ++++++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcSystemLoader.java      |  3 +-
 .../rpc/akka/AkkaRpcSystemLoaderITCase.java        | 32 ++++++++++++++++
 4 files changed, 101 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index e517dc8eb56..c336e7d49c5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -648,6 +648,29 @@ public final class FileUtils {
         return fileName;
     }
 
+    /**
+     * Get a target path(the path that replaced symbolic links with linked 
path) if the original
+     * path contains symbolic path, return the original path otherwise.
+     *
+     * @param path the original path.
+     * @return the path that replaced symbolic links with real path.
+     */
+    public static java.nio.file.Path 
getTargetPathIfContainsSymbolicPath(java.nio.file.Path path)
+            throws IOException {
+        java.nio.file.Path targetPath = path;
+        java.nio.file.Path suffixPath = Paths.get("");
+        while (path != null && path.getFileName() != null) {
+            if (Files.isSymbolicLink(path)) {
+                java.nio.file.Path linkedPath = path.toRealPath();
+                targetPath = Paths.get(linkedPath.toString(), 
suffixPath.toString());
+                break;
+            }
+            suffixPath = Paths.get(path.getFileName().toString(), 
suffixPath.toString());
+            path = path.getParent();
+        }
+        return targetPath;
+    }
+
     /**
      * Converts the given {@link java.nio.file.Path} into a file {@link URL}. 
The resulting url is
      * relative iff the given path is relative.
diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
index b800ef6541c..b24179b6f67 100644
--- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -315,6 +315,50 @@ public class FileUtilsTest {
         
assertThat(paths).containsExactlyInAnyOrder(linkPath.resolve(file.getName()));
     }
 
+    @Test
+    void testGetTargetPathNotContainsSymbolicPath() throws IOException {
+        java.nio.file.Path testPath = Paths.get("parent", "child");
+        java.nio.file.Path targetPath = 
FileUtils.getTargetPathIfContainsSymbolicPath(testPath);
+        assertThat(targetPath).isEqualTo(testPath);
+    }
+
+    @Test
+    void testGetTargetPathContainsSymbolicPath() throws IOException {
+        File linkedDir = TempDirUtils.newFolder(temporaryFolder, "linked");
+        java.nio.file.Path symlink = Paths.get(temporaryFolder.toString(), 
"symlink");
+        java.nio.file.Path dirInLinked =
+                TempDirUtils.newFolder(linkedDir.toPath(), "one", 
"two").toPath().toRealPath();
+        Files.createSymbolicLink(symlink, linkedDir.toPath());
+
+        java.nio.file.Path targetPath =
+                FileUtils.getTargetPathIfContainsSymbolicPath(
+                        symlink.resolve("one").resolve("two"));
+        assertThat(targetPath).isEqualTo(dirInLinked);
+    }
+
+    @Test
+    void testGetTargetPathContainsMultipleSymbolicPath() throws IOException {
+        File linked1Dir = TempDirUtils.newFolder(temporaryFolder, "linked1");
+        java.nio.file.Path symlink1 = Paths.get(temporaryFolder.toString(), 
"symlink1");
+        Files.createSymbolicLink(symlink1, linked1Dir.toPath());
+
+        java.nio.file.Path symlink2 = Paths.get(symlink1.toString(), 
"symlink2");
+        File linked2Dir = TempDirUtils.newFolder(temporaryFolder, "linked2");
+        Files.createSymbolicLink(symlink2, linked2Dir.toPath());
+        java.nio.file.Path dirInLinked2 =
+                TempDirUtils.newFolder(linked2Dir.toPath(), 
"one").toPath().toRealPath();
+
+        // symlink3 point to another symbolic link: symlink2
+        java.nio.file.Path symlink3 = Paths.get(symlink1.toString(), 
"symlink3");
+        Files.createSymbolicLink(symlink3, symlink2);
+
+        java.nio.file.Path targetPath =
+                FileUtils.getTargetPathIfContainsSymbolicPath(
+                        // path contains multiple symlink : 
xxx/symlink1/symlink3/one
+                        symlink3.resolve("one"));
+        assertThat(targetPath).isEqualTo(dirInLinked2);
+    }
+
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------
diff --git 
a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
 
b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
index 735f739aaac..afe2c5bd0fc 100644
--- 
a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
+++ 
b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.core.classloading.SubmoduleClassLoader;
 import org.apache.flink.runtime.rpc.RpcSystem;
 import org.apache.flink.runtime.rpc.RpcSystemLoader;
 import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 
 import java.io.IOException;
@@ -55,7 +56,7 @@ public class AkkaRpcSystemLoader implements RpcSystemLoader {
             final ClassLoader flinkClassLoader = 
RpcSystem.class.getClassLoader();
 
             final Path tmpDirectory = 
Paths.get(ConfigurationUtils.parseTempDirectories(config)[0]);
-            Files.createDirectories(tmpDirectory);
+            
Files.createDirectories(FileUtils.getTargetPathIfContainsSymbolicPath(tmpDirectory));
             final Path tempFile =
                     Files.createFile(
                             tmpDirectory.resolve("flink-rpc-akka_" + 
UUID.randomUUID() + ".jar"));
diff --git 
a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoaderITCase.java
 
b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoaderITCase.java
index 7b6d1452c10..2ba3c621c2b 100644
--- 
a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoaderITCase.java
+++ 
b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoaderITCase.java
@@ -24,10 +24,13 @@ import org.apache.flink.runtime.rpc.RpcSystem;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Tests for the {@link AkkaRpcSystemLoader}.
@@ -56,4 +59,33 @@ class AkkaRpcSystemLoaderITCase {
             assertThat(rpcSystem).isNotNull();
         }
     }
+
+    @Test
+    void testServiceLoadingWithExistingLinkedPath(@TempDir Path tempDir) 
throws Exception {
+        final Configuration config = new Configuration();
+
+        Path linkedDirectory = Paths.get(tempDir.toString(), "linkedDir");
+        Path symbolicLink = Paths.get(tempDir.toString(), "symlink");
+        Files.createSymbolicLink(symbolicLink, linkedDirectory);
+        Files.createDirectories(linkedDirectory.resolve("a").resolve("b"));
+        // set the tmp dirs to dirs in symbolic link path.
+        config.set(CoreOptions.TMP_DIRS, 
symbolicLink.resolve("a").resolve("b").toString());
+        try (final RpcSystem rpcSystem = LOADER.loadRpcSystem(config)) {
+            assertThat(rpcSystem).isNotNull();
+        }
+    }
+
+    @Test
+    void testServiceLoadingWithNonExistingLinkedPath(@TempDir Path tempDir) 
throws Exception {
+        final Configuration config = new Configuration();
+
+        Path linkedDirectory = Paths.get(tempDir.toString(), "linkedDir");
+        Path symbolicLink = Paths.get(tempDir.toString(), "symlink");
+        Files.createSymbolicLink(symbolicLink, linkedDirectory);
+        // set the tmp dirs to dirs in symbolic link path.
+        config.set(CoreOptions.TMP_DIRS, symbolicLink.toString());
+        // if this is a symlink that linked dir not exist, throw exception 
directly.
+        assertThatThrownBy(() -> LOADER.loadRpcSystem(config))
+                .hasRootCauseInstanceOf(NoSuchFileException.class);
+    }
 }

Reply via email to