This is an automated email from the ASF dual-hosted git repository. fcsaky pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push: new 4d1d0664253 [FLINK-37183][clients] Fix symbolic links not being followed in "usrlib" 4d1d0664253 is described below commit 4d1d0664253f47b165b44e96ea75461c35bc30a2 Author: Joery <jo...@joery.net> AuthorDate: Wed Jan 22 08:00:12 2025 +0100 [FLINK-37183][clients] Fix symbolic links not being followed in "usrlib" Co-authored-by: Joery van den Hoff <joery.vandenh...@imc.com> --- .../program/DefaultPackagedProgramRetriever.java | 3 +- .../DefaultPackagedProgramRetrieverITCase.java | 37 ++++++++++++++++++++++ .../client/testjar/ClasspathProviderExtension.java | 20 ++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java index 5e286afc4c3..ca384ca3d47 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java @@ -35,6 +35,7 @@ import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -237,7 +238,7 @@ public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever return Collections.emptyList(); } - try (Stream<Path> files = Files.walk(userLibDir.toPath())) { + try (Stream<Path> files = Files.walk(userLibDir.toPath(), FileVisitOption.FOLLOW_LINKS)) { return getClasspathsFromArtifacts(files, jarFile); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java index 6fbe706aaaf..60786e2e41f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java @@ -63,6 +63,10 @@ class DefaultPackagedProgramRetrieverITCase { ClasspathProviderExtension singleEntryClassClasspathProvider = ClasspathProviderExtension.createWithSingleEntryClass(); + @RegisterExtension + ClasspathProviderExtension symlinkClasspathProvider = + ClasspathProviderExtension.createWithSymlink(); + @RegisterExtension ClasspathProviderExtension multipleEntryClassesClasspathProvider = ClasspathProviderExtension.createWithMultipleEntryClasses(); @@ -522,6 +526,34 @@ class DefaultPackagedProgramRetrieverITCase { assertThat(actualClasspath).isEqualTo(expectedClasspath); } + @Test + void testRetrieveFromJarFileWithSymlinkUserLib() + throws IOException, FlinkException, ProgramInvocationException { + final File actualUsrLib = new File(symlinkClasspathProvider.getDirectory(), "usrlib"); + final PackagedProgramRetriever retrieverUnderTest = + DefaultPackagedProgramRetriever.create( + actualUsrLib, + // the testJob jar is not on the user classpath + testJobEntryClassClasspathProvider.getJobJar(), + null, + null, + ClasspathProviderExtension.parametersForTestJob("suffix"), + new Configuration()); + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + + assertThat(jobGraph.getUserJars()) + .contains( + new org.apache.flink.core.fs.Path( + testJobEntryClassClasspathProvider.getJobJar().toURI())); + final List<String> actualClasspath = + jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()); + final List<String> expectedClasspath = + extractRelativizedURLsForJarsFromDirectory(actualUsrLib); + + assertThat(actualClasspath).hasSize(2); + assertThat(actualClasspath).isEqualTo(expectedClasspath); + } + @Test void testRetrieveFromJarFileWithArtifacts() throws IOException, FlinkException, ProgramInvocationException { @@ -684,6 +716,11 @@ class DefaultPackagedProgramRetrieverITCase { final List<String> relativizedURLs = new ArrayList<>(); final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); for (File file : Preconditions.checkNotNull(directory.listFiles())) { + if (file.isDirectory()) { + relativizedURLs.addAll(extractRelativizedURLsForJarsFromDirectory(file)); + continue; + } + if (!FileUtils.isJarFile(file.toPath())) { // any non-JARs are filtered by PackagedProgramRetrieverImpl continue; diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java index f53ae539774..ffae88812f5 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java +++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java @@ -43,6 +43,8 @@ import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.assertj.core.api.Assertions.assertThat; + /** * {@code ClasspathProviderExtension} offers utility methods for creating a classpath based on * actual jars. @@ -92,6 +94,24 @@ public class ClasspathProviderExtension implements BeforeEachCallback, AfterEach JOB_JAR_PATH.toFile()); } + public static ClasspathProviderExtension createWithSymlink() { + return new ClasspathProviderExtension( + "_user_dir_with_symlink", + directory -> { + final File actualUsrLib = new File(directory, "usrlib"); + final File symLinkDir = new File(directory, "symlink"); + assertThat(actualUsrLib.mkdirs()).isTrue(); + assertThat(symLinkDir.mkdirs()).isTrue(); + + copyJar(JOB_LIB_JAR_PATH, symLinkDir); + copyJar(JOB_JAR_PATH, actualUsrLib); + + Files.createSymbolicLink( + actualUsrLib.toPath().resolve("symlink"), symLinkDir.toPath()); + }, + JOB_JAR_PATH.toFile()); + } + public static ClasspathProviderExtension createWithMultipleEntryClasses() { return new ClasspathProviderExtension( "_user_dir_with_multiple_entry_classes",