This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 4d1d0664253 [FLINK-37183][clients] Fix symbolic links not being 
followed in "usrlib"
4d1d0664253 is described below

commit 4d1d0664253f47b165b44e96ea75461c35bc30a2
Author: Joery <jo...@joery.net>
AuthorDate: Wed Jan 22 08:00:12 2025 +0100

    [FLINK-37183][clients] Fix symbolic links not being followed in "usrlib"
    
    Co-authored-by: Joery van den Hoff <joery.vandenh...@imc.com>
---
 .../program/DefaultPackagedProgramRetriever.java   |  3 +-
 .../DefaultPackagedProgramRetrieverITCase.java     | 37 ++++++++++++++++++++++
 .../client/testjar/ClasspathProviderExtension.java | 20 ++++++++++++
 3 files changed, 59 insertions(+), 1 deletion(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java
index 5e286afc4c3..ca384ca3d47 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java
@@ -35,6 +35,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.file.FileVisitOption;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -237,7 +238,7 @@ public class DefaultPackagedProgramRetriever implements 
PackagedProgramRetriever
             return Collections.emptyList();
         }
 
-        try (Stream<Path> files = Files.walk(userLibDir.toPath())) {
+        try (Stream<Path> files = Files.walk(userLibDir.toPath(), 
FileVisitOption.FOLLOW_LINKS)) {
             return getClasspathsFromArtifacts(files, jarFile);
         }
     }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
index 6fbe706aaaf..60786e2e41f 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
@@ -63,6 +63,10 @@ class DefaultPackagedProgramRetrieverITCase {
     ClasspathProviderExtension singleEntryClassClasspathProvider =
             ClasspathProviderExtension.createWithSingleEntryClass();
 
+    @RegisterExtension
+    ClasspathProviderExtension symlinkClasspathProvider =
+            ClasspathProviderExtension.createWithSymlink();
+
     @RegisterExtension
     ClasspathProviderExtension multipleEntryClassesClasspathProvider =
             ClasspathProviderExtension.createWithMultipleEntryClasses();
@@ -522,6 +526,34 @@ class DefaultPackagedProgramRetrieverITCase {
         assertThat(actualClasspath).isEqualTo(expectedClasspath);
     }
 
+    @Test
+    void testRetrieveFromJarFileWithSymlinkUserLib()
+            throws IOException, FlinkException, ProgramInvocationException {
+        final File actualUsrLib = new 
File(symlinkClasspathProvider.getDirectory(), "usrlib");
+        final PackagedProgramRetriever retrieverUnderTest =
+                DefaultPackagedProgramRetriever.create(
+                        actualUsrLib,
+                        // the testJob jar is not on the user classpath
+                        testJobEntryClassClasspathProvider.getJobJar(),
+                        null,
+                        null,
+                        
ClasspathProviderExtension.parametersForTestJob("suffix"),
+                        new Configuration());
+        final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new 
Configuration());
+
+        assertThat(jobGraph.getUserJars())
+                .contains(
+                        new org.apache.flink.core.fs.Path(
+                                
testJobEntryClassClasspathProvider.getJobJar().toURI()));
+        final List<String> actualClasspath =
+                
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+        final List<String> expectedClasspath =
+                extractRelativizedURLsForJarsFromDirectory(actualUsrLib);
+
+        assertThat(actualClasspath).hasSize(2);
+        assertThat(actualClasspath).isEqualTo(expectedClasspath);
+    }
+
     @Test
     void testRetrieveFromJarFileWithArtifacts()
             throws IOException, FlinkException, ProgramInvocationException {
@@ -684,6 +716,11 @@ class DefaultPackagedProgramRetrieverITCase {
         final List<String> relativizedURLs = new ArrayList<>();
         final Path workingDirectory = FileUtils.getCurrentWorkingDirectory();
         for (File file : Preconditions.checkNotNull(directory.listFiles())) {
+            if (file.isDirectory()) {
+                
relativizedURLs.addAll(extractRelativizedURLsForJarsFromDirectory(file));
+                continue;
+            }
+
             if (!FileUtils.isJarFile(file.toPath())) {
                 // any non-JARs are filtered by PackagedProgramRetrieverImpl
                 continue;
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java
 
b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java
index f53ae539774..ffae88812f5 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java
@@ -43,6 +43,8 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /**
  * {@code ClasspathProviderExtension} offers utility methods for creating a 
classpath based on
  * actual jars.
@@ -92,6 +94,24 @@ public class ClasspathProviderExtension implements 
BeforeEachCallback, AfterEach
                 JOB_JAR_PATH.toFile());
     }
 
+    public static ClasspathProviderExtension createWithSymlink() {
+        return new ClasspathProviderExtension(
+                "_user_dir_with_symlink",
+                directory -> {
+                    final File actualUsrLib = new File(directory, "usrlib");
+                    final File symLinkDir = new File(directory, "symlink");
+                    assertThat(actualUsrLib.mkdirs()).isTrue();
+                    assertThat(symLinkDir.mkdirs()).isTrue();
+
+                    copyJar(JOB_LIB_JAR_PATH, symLinkDir);
+                    copyJar(JOB_JAR_PATH, actualUsrLib);
+
+                    Files.createSymbolicLink(
+                            actualUsrLib.toPath().resolve("symlink"), 
symLinkDir.toPath());
+                },
+                JOB_JAR_PATH.toFile());
+    }
+
     public static ClasspathProviderExtension createWithMultipleEntryClasses() {
         return new ClasspathProviderExtension(
                 "_user_dir_with_multiple_entry_classes",

Reply via email to