XComp commented on a change in pull request #15020: URL: https://github.com/apache/flink/pull/15020#discussion_r590312576
########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramRetriever; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.FunctionUtils; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} containing the user's {@code + * main()} from a class on the class path. + */ +@Internal +public abstract class AbstractPackagedProgramRetriever implements PackagedProgramRetriever { + + @Nonnull protected final String[] programArguments; + + @Nonnull protected final Configuration configuration; + + /** User class paths in relative form to the working directory. */ + @Nonnull protected final Collection<URL> userClassPaths; + + AbstractPackagedProgramRetriever( + @Nonnull String[] programArguments, + @Nonnull Configuration configuration, + @Nullable File userLibDirectory) + throws IOException { + this.programArguments = requireNonNull(programArguments, "programArguments"); + this.configuration = requireNonNull(configuration); + this.userClassPaths = discoverUserClassPaths(userLibDirectory); + } + + private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) throws IOException { + if (jobDir == null) { + return Collections.emptyList(); + } + + final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); + final Collection<URL> relativeJarURLs = + FileUtils.listFilesInDirectory(jobDir.toPath(), FileUtils::isJarFile).stream() + .map(path -> FileUtils.relativizePath(workingDirectory, path)) + .map(FunctionUtils.uncheckedFunction(FileUtils::toURL)) + .collect(Collectors.toList()); + return Collections.unmodifiableCollection(relativeJarURLs); + } + + @Override + public PackagedProgram getPackagedProgram() throws FlinkException { + try { + return buildPackagedProgram(); + } catch (ProgramInvocationException e) { + throw new FlinkException("Could not load the provided entry point class.", e); + } + } + + public abstract PackagedProgram buildPackagedProgram() Review comment: ```suggestion protected abstract PackagedProgram buildPackagedProgram() ``` This method can be protected as far as I'm concerned. ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/JarFilePackagedProgramRetriever.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FlinkException; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; + +/** + * A jar file {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} with the specified jar file. + */ +@Internal +public class JarFilePackagedProgramRetriever extends AbstractPackagedProgramRetriever { + + @Nullable private final String jobClassName; + + @Nullable private final File jarFile; + + protected JarFilePackagedProgramRetriever( + @Nonnull String[] programArguments, + @Nonnull Configuration configuration, + @Nullable String jobClassName, + @Nullable File userLibDirectory, + @Nullable File jarFile) + throws IOException { + super(programArguments, configuration, userLibDirectory); + this.jobClassName = jobClassName; + this.jarFile = jarFile; + } + + @Override + public PackagedProgram buildPackagedProgram() + throws ProgramInvocationException, FlinkException { Review comment: `FlinkException` is never thrown ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramRetrieverAdapter.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.client.program.PackagedProgramRetriever; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.configuration.Configuration; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +/** Adapter to provide a {@link Builder} to build a {@link PackagedProgramRetriever}. */ +public final class PackagedProgramRetrieverAdapter { Review comment: The `PackagedProgramRetrieverAdapter` does not fulfil any purpose here, does it? We could move the `Builder` to become a first-level class instead of an inner class. Additionally, I feel like the `Builder` should be renamed into something like a `PackagedProgramRetrieverFactory` as it instantiated objects from different kind of subclasses. ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java ########## @@ -38,109 +36,56 @@ import java.io.File; import java.io.IOException; import java.net.URL; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.NoSuchElementException; import java.util.function.Supplier; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; - /** - * A {@link org.apache.flink.client.program.PackagedProgramRetriever PackagedProgramRetriever} which - * creates the {@link org.apache.flink.client.program.PackagedProgram PackagedProgram} containing - * the user's {@code main()} from a class on the class path. + * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever Review comment: nit: Could we import `PackagedProgramRetriever` to improve readability? ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java ########## @@ -38,109 +36,56 @@ import java.io.File; import java.io.IOException; import java.net.URL; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.NoSuchElementException; import java.util.function.Supplier; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; - /** - * A {@link org.apache.flink.client.program.PackagedProgramRetriever PackagedProgramRetriever} which - * creates the {@link org.apache.flink.client.program.PackagedProgram PackagedProgram} containing - * the user's {@code main()} from a class on the class path. + * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} through scanning the classpath Review comment: ```suggestion * PackagedProgram} through scanning the classpath ``` We don't need the package information here. ########## File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java ########## @@ -312,87 +205,64 @@ public void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass() assertThat( jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()), - containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray())); + containsInAnyOrder(EXPECTED_URLS.stream().map(URL::toString).toArray())); } @Test - public void testRetrieveFromJarFileWithoutUserLib() - throws IOException, FlinkException, ProgramInvocationException { - final File testJar = TestJob.getTestJobJar(); - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - .setJarFile(testJar) - .build(); - final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + public void testGetPackagedProgramWithConfiguration() throws IOException, FlinkException { + final Configuration configuration = new Configuration(); + configuration.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first"); + configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); - assertThat( - jobGraph.getUserJars(), - containsInAnyOrder(new org.apache.flink.core.fs.Path(testJar.toURI()))); - assertThat(jobGraph.getClasspaths().isEmpty(), is(true)); + final PackagedProgramRetriever retrieverUnderTest = + newBuilder(PROGRAM_ARGUMENTS) + .setConfiguration(configuration) + .setJobClassName(TestJob.class.getCanonicalName()) + .build(); + assertTrue( + retrieverUnderTest.getPackagedProgram().getUserCodeClassLoader() + instanceof FlinkUserCodeClassLoaders.ParentFirstClassLoader); } @Test - public void testRetrieveFromJarFileWithUserLib() - throws IOException, FlinkException, ProgramInvocationException { - final File testJar = TestJob.getTestJobJar(); - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - .setJarFile(testJar) - .setUserLibDirectory(userDirHasEntryClass) - .build(); - final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + public void testJarFromClassPathSupplier() throws IOException { + final File file1 = temporaryFolder.newFile(); + final File file2 = temporaryFolder.newFile(); + final File directory = temporaryFolder.newFolder(); - assertThat( - jobGraph.getUserJars(), - containsInAnyOrder(new org.apache.flink.core.fs.Path(testJar.toURI()))); - assertThat( - jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()), - containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray())); - } + // Mock java.class.path property. The empty strings are important as the shell scripts + // that prepare the Flink class path often have such entries. + final String classPath = + javaClassPath( + "", + "", + "", + file1.getAbsolutePath(), + "", + directory.getAbsolutePath(), + "", + file2.getAbsolutePath(), + "", + ""); + + Iterable<File> jarFiles = setClassPathAndGetJarsOnClassPath(classPath); - private JobGraph retrieveJobGraph( - ClassPathPackagedProgramRetriever retrieverUnderTest, Configuration configuration) - throws FlinkException, ProgramInvocationException, MalformedURLException { - final PackagedProgram packagedProgram = retrieverUnderTest.getPackagedProgram(); - - final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); - ConfigUtils.encodeCollectionToConfig( - configuration, - PipelineOptions.JARS, - packagedProgram.getJobJarAndDependencies(), - URL::toString); - ConfigUtils.encodeCollectionToConfig( - configuration, - PipelineOptions.CLASSPATHS, - packagedProgram.getClasspaths(), - URL::toString); - - final Pipeline pipeline = - PackagedProgramUtils.getPipelineFromProgram( - packagedProgram, configuration, defaultParallelism, false); - return PipelineExecutorUtils.getJobGraph(pipeline, configuration); + assertThat(jarFiles, contains(file1, file2)); } private static String javaClassPath(String... entries) { - String pathSeparator = - System.getProperty( - ClassPathPackagedProgramRetriever.JarsOnClassPath.PATH_SEPARATOR); + String pathSeparator = System.getProperty(PATH_SEPARATOR); return String.join(pathSeparator, entries); } private static Iterable<File> setClassPathAndGetJarsOnClassPath(String classPath) { - final String originalClassPath = - System.getProperty( - ClassPathPackagedProgramRetriever.JarsOnClassPath.JAVA_CLASS_PATH); + final String originalClassPath = System.getProperty(JAVA_CLASS_PATH); try { - System.setProperty( - ClassPathPackagedProgramRetriever.JarsOnClassPath.JAVA_CLASS_PATH, classPath); - return ClassPathPackagedProgramRetriever.JarsOnClassPath.INSTANCE.get(); + System.setProperty(JAVA_CLASS_PATH, classPath); Review comment: ```suggestion System.setProperty(JAVA_CLASSPATH, classpath); ``` It's just a minor thing aligning classpath with the way Java formats it. It applies to the related lines as well. You might want to put it into a separate commit if you decide to do this. ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java ########## @@ -38,109 +36,56 @@ import java.io.File; import java.io.IOException; import java.net.URL; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.NoSuchElementException; import java.util.function.Supplier; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; - /** - * A {@link org.apache.flink.client.program.PackagedProgramRetriever PackagedProgramRetriever} which - * creates the {@link org.apache.flink.client.program.PackagedProgram PackagedProgram} containing - * the user's {@code main()} from a class on the class path. + * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} through scanning the classpath + * for the job class. */ @Internal -public class ClassPathPackagedProgramRetriever implements PackagedProgramRetriever { +public class ClassPathPackagedProgramRetriever extends AbstractPackagedProgramRetriever { Review comment: ```suggestion public class ClasspathPackagedProgramRetriever extends AbstractPackagedProgramRetriever { ``` I'd suggest sticking to the formatting Java suggests for the classpath as being one word ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramRetriever; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.FunctionUtils; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} containing the user's {@code + * main()} from a class on the class path. + */ +@Internal +public abstract class AbstractPackagedProgramRetriever implements PackagedProgramRetriever { + + @Nonnull protected final String[] programArguments; + + @Nonnull protected final Configuration configuration; + + /** User class paths in relative form to the working directory. */ + @Nonnull protected final Collection<URL> userClassPaths; + + AbstractPackagedProgramRetriever( + @Nonnull String[] programArguments, + @Nonnull Configuration configuration, + @Nullable File userLibDirectory) + throws IOException { + this.programArguments = requireNonNull(programArguments, "programArguments"); + this.configuration = requireNonNull(configuration); + this.userClassPaths = discoverUserClassPaths(userLibDirectory); + } + + private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) throws IOException { + if (jobDir == null) { + return Collections.emptyList(); + } + + final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); + final Collection<URL> relativeJarURLs = + FileUtils.listFilesInDirectory(jobDir.toPath(), FileUtils::isJarFile).stream() + .map(path -> FileUtils.relativizePath(workingDirectory, path)) + .map(FunctionUtils.uncheckedFunction(FileUtils::toURL)) + .collect(Collectors.toList()); + return Collections.unmodifiableCollection(relativeJarURLs); + } + + @Override + public PackagedProgram getPackagedProgram() throws FlinkException { + try { + return buildPackagedProgram(); Review comment: ```suggestion final PackagedProgram.Builder packagedProgramBuilder = PackagedProgram.newBuilder() .setConfiguration(configuration) .setArguments(programArguments) .setUserClassPaths(userClassPaths); return buildPackagedProgram(packagedProgramBuilder); ``` We could move the Builder creation into `AbstractPackagedProgramRetriever`. This way, we could make the general members `userClassPaths`, `configuration`, and `programArguments` private. A question I have, is: What's the reason for creating a copy of `userClassPaths`? I didn't find a necessity to do so in the code 🤔 ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java ########## @@ -38,109 +36,56 @@ import java.io.File; import java.io.IOException; import java.net.URL; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.NoSuchElementException; import java.util.function.Supplier; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; - /** - * A {@link org.apache.flink.client.program.PackagedProgramRetriever PackagedProgramRetriever} which - * creates the {@link org.apache.flink.client.program.PackagedProgram PackagedProgram} containing - * the user's {@code main()} from a class on the class path. + * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} through scanning the classpath + * for the job class. */ @Internal -public class ClassPathPackagedProgramRetriever implements PackagedProgramRetriever { +public class ClassPathPackagedProgramRetriever extends AbstractPackagedProgramRetriever { private static final Logger LOG = LoggerFactory.getLogger(ClassPathPackagedProgramRetriever.class); - /** User classpaths in relative form to the working directory. */ - @Nonnull private final Collection<URL> userClassPaths; - - @Nonnull private final String[] programArguments; + @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath; @Nullable private final String jobClassName; - @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath; - @Nullable private final File userLibDirectory; - @Nullable private final File jarFile; - - private ClassPathPackagedProgramRetriever( + protected ClassPathPackagedProgramRetriever( @Nonnull String[] programArguments, + @Nonnull Configuration configuration, + @Nullable Supplier<Iterable<File>> jarsOnClassPath, @Nullable String jobClassName, - @Nonnull Supplier<Iterable<File>> jarsOnClassPath, - @Nullable File userLibDirectory, - @Nullable File jarFile) + @Nullable File userLibDirectory) throws IOException { - this.userLibDirectory = userLibDirectory; - this.programArguments = requireNonNull(programArguments, "programArguments"); + super(programArguments, configuration, userLibDirectory); + this.jarsOnClassPath = jarsOnClassPath == null ? JarsOnClassPath.INSTANCE : jarsOnClassPath; this.jobClassName = jobClassName; - this.jarsOnClassPath = requireNonNull(jarsOnClassPath); - this.userClassPaths = discoverUserClassPaths(userLibDirectory); - this.jarFile = jarFile; - } - - private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) throws IOException { - if (jobDir == null) { - return Collections.emptyList(); - } - - final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); - final Collection<URL> relativeJarURLs = - FileUtils.listFilesInDirectory(jobDir.toPath(), FileUtils::isJarFile).stream() - .map(path -> FileUtils.relativizePath(workingDirectory, path)) - .map(FunctionUtils.uncheckedFunction(FileUtils::toURL)) - .collect(Collectors.toList()); - return Collections.unmodifiableCollection(relativeJarURLs); + this.userLibDirectory = userLibDirectory; } @Override - public PackagedProgram getPackagedProgram() throws FlinkException { - try { - // It is Python job if program arguments contain "-py"/--python" or "-pym/--pyModule", - // set the fixed - // jobClassName and jarFile path. - if (PackagedProgramUtils.isPython(jobClassName) - || PackagedProgramUtils.isPython(programArguments)) { - String pythonJobClassName = PackagedProgramUtils.getPythonDriverClassName(); - File pythonJarFile = new File(PackagedProgramUtils.getPythonJar().getPath()); - return PackagedProgram.newBuilder() - .setUserClassPaths(new ArrayList<>(userClassPaths)) - .setArguments(programArguments) - .setJarFile(pythonJarFile) - .setEntryPointClassName(pythonJobClassName) - .build(); - } - - if (jarFile != null) { - return PackagedProgram.newBuilder() - .setUserClassPaths(new ArrayList<>(userClassPaths)) - .setArguments(programArguments) - .setJarFile(jarFile) - .setEntryPointClassName(jobClassName) - .build(); - } - - final String entryClass = getJobClassNameOrScanClassPath(); - return PackagedProgram.newBuilder() - .setUserClassPaths(new ArrayList<>(userClassPaths)) - .setEntryPointClassName(entryClass) - .setArguments(programArguments) - .build(); - } catch (ProgramInvocationException e) { - throw new FlinkException("Could not load the provided entrypoint class.", e); - } + public PackagedProgram buildPackagedProgram() + throws ProgramInvocationException, FlinkException { + final String entryClass = getJobClassNameOrScanClassPath(); + return PackagedProgram.newBuilder() + .setArguments(programArguments) + .setConfiguration(configuration) + .setUserClassPaths(new ArrayList<>(userClassPaths)) + .setEntryPointClassName(entryClass) + .build(); } private String getJobClassNameOrScanClassPath() throws FlinkException { Review comment: Legacy: I'd suggest cleaning up the jumping between parameter passing and accessing members here. I guess, we could make all three methods static and pass the members as parameters into the methods. ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java ########## @@ -38,109 +36,56 @@ import java.io.File; import java.io.IOException; import java.net.URL; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.NoSuchElementException; import java.util.function.Supplier; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; - /** - * A {@link org.apache.flink.client.program.PackagedProgramRetriever PackagedProgramRetriever} which - * creates the {@link org.apache.flink.client.program.PackagedProgram PackagedProgram} containing - * the user's {@code main()} from a class on the class path. + * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} through scanning the classpath + * for the job class. */ @Internal -public class ClassPathPackagedProgramRetriever implements PackagedProgramRetriever { +public class ClassPathPackagedProgramRetriever extends AbstractPackagedProgramRetriever { private static final Logger LOG = LoggerFactory.getLogger(ClassPathPackagedProgramRetriever.class); - /** User classpaths in relative form to the working directory. */ - @Nonnull private final Collection<URL> userClassPaths; - - @Nonnull private final String[] programArguments; + @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath; @Nullable private final String jobClassName; - @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath; - @Nullable private final File userLibDirectory; - @Nullable private final File jarFile; - - private ClassPathPackagedProgramRetriever( + protected ClassPathPackagedProgramRetriever( @Nonnull String[] programArguments, + @Nonnull Configuration configuration, + @Nullable Supplier<Iterable<File>> jarsOnClassPath, @Nullable String jobClassName, - @Nonnull Supplier<Iterable<File>> jarsOnClassPath, - @Nullable File userLibDirectory, - @Nullable File jarFile) + @Nullable File userLibDirectory) throws IOException { - this.userLibDirectory = userLibDirectory; - this.programArguments = requireNonNull(programArguments, "programArguments"); + super(programArguments, configuration, userLibDirectory); + this.jarsOnClassPath = jarsOnClassPath == null ? JarsOnClassPath.INSTANCE : jarsOnClassPath; this.jobClassName = jobClassName; - this.jarsOnClassPath = requireNonNull(jarsOnClassPath); - this.userClassPaths = discoverUserClassPaths(userLibDirectory); - this.jarFile = jarFile; - } - - private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) throws IOException { - if (jobDir == null) { - return Collections.emptyList(); - } - - final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); - final Collection<URL> relativeJarURLs = - FileUtils.listFilesInDirectory(jobDir.toPath(), FileUtils::isJarFile).stream() - .map(path -> FileUtils.relativizePath(workingDirectory, path)) - .map(FunctionUtils.uncheckedFunction(FileUtils::toURL)) - .collect(Collectors.toList()); - return Collections.unmodifiableCollection(relativeJarURLs); + this.userLibDirectory = userLibDirectory; } @Override - public PackagedProgram getPackagedProgram() throws FlinkException { - try { - // It is Python job if program arguments contain "-py"/--python" or "-pym/--pyModule", - // set the fixed - // jobClassName and jarFile path. - if (PackagedProgramUtils.isPython(jobClassName) - || PackagedProgramUtils.isPython(programArguments)) { - String pythonJobClassName = PackagedProgramUtils.getPythonDriverClassName(); - File pythonJarFile = new File(PackagedProgramUtils.getPythonJar().getPath()); - return PackagedProgram.newBuilder() - .setUserClassPaths(new ArrayList<>(userClassPaths)) - .setArguments(programArguments) - .setJarFile(pythonJarFile) - .setEntryPointClassName(pythonJobClassName) - .build(); - } - - if (jarFile != null) { - return PackagedProgram.newBuilder() - .setUserClassPaths(new ArrayList<>(userClassPaths)) - .setArguments(programArguments) - .setJarFile(jarFile) - .setEntryPointClassName(jobClassName) - .build(); - } - - final String entryClass = getJobClassNameOrScanClassPath(); - return PackagedProgram.newBuilder() - .setUserClassPaths(new ArrayList<>(userClassPaths)) - .setEntryPointClassName(entryClass) - .setArguments(programArguments) - .build(); - } catch (ProgramInvocationException e) { - throw new FlinkException("Could not load the provided entrypoint class.", e); - } + public PackagedProgram buildPackagedProgram() + throws ProgramInvocationException, FlinkException { + final String entryClass = getJobClassNameOrScanClassPath(); + return PackagedProgram.newBuilder() + .setArguments(programArguments) + .setConfiguration(configuration) + .setUserClassPaths(new ArrayList<>(userClassPaths)) + .setEntryPointClassName(entryClass) + .build(); } private String getJobClassNameOrScanClassPath() throws FlinkException { Review comment: Legacy: Could we replace the nested `if`'s by logical `&&`? That would improve the readability, I guess. ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/PythonBasedPackagedProgramRetriever.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FlinkException; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; + +/** + * A python based {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} when program arguments contain + * "-py/--python" or "-pym/--pyModule". + */ +@Internal +public class PythonBasedPackagedProgramRetriever extends AbstractPackagedProgramRetriever { + + protected PythonBasedPackagedProgramRetriever( Review comment: ```suggestion PythonBasedPackagedProgramRetriever( ``` ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/PythonBasedPackagedProgramRetriever.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FlinkException; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; + +/** + * A python based {@link org.apache.flink.client.program.PackagedProgramRetriever Review comment: ```suggestion * A python based {@link PackagedProgramRetriever ``` nit: Import class to make the JavaDoc more readable. ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java ########## @@ -38,109 +36,56 @@ import java.io.File; import java.io.IOException; import java.net.URL; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.NoSuchElementException; import java.util.function.Supplier; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; - /** - * A {@link org.apache.flink.client.program.PackagedProgramRetriever PackagedProgramRetriever} which - * creates the {@link org.apache.flink.client.program.PackagedProgram PackagedProgram} containing - * the user's {@code main()} from a class on the class path. + * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} through scanning the classpath + * for the job class. */ @Internal -public class ClassPathPackagedProgramRetriever implements PackagedProgramRetriever { +public class ClassPathPackagedProgramRetriever extends AbstractPackagedProgramRetriever { private static final Logger LOG = LoggerFactory.getLogger(ClassPathPackagedProgramRetriever.class); - /** User classpaths in relative form to the working directory. */ - @Nonnull private final Collection<URL> userClassPaths; - - @Nonnull private final String[] programArguments; + @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath; @Nullable private final String jobClassName; - @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath; - @Nullable private final File userLibDirectory; - @Nullable private final File jarFile; - - private ClassPathPackagedProgramRetriever( + protected ClassPathPackagedProgramRetriever( Review comment: ```suggestion ClassPathPackagedProgramRetriever( ``` Only `PackagedProgramRetrieverAdapter` needs to access it. ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramRetriever; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.FunctionUtils; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} containing the user's {@code Review comment: ```suggestion * PackagedProgram} containing the user's {@code ``` ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java ########## @@ -38,109 +36,56 @@ import java.io.File; import java.io.IOException; import java.net.URL; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.NoSuchElementException; import java.util.function.Supplier; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; - /** - * A {@link org.apache.flink.client.program.PackagedProgramRetriever PackagedProgramRetriever} which - * creates the {@link org.apache.flink.client.program.PackagedProgram PackagedProgram} containing - * the user's {@code main()} from a class on the class path. + * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} through scanning the classpath + * for the job class. */ @Internal -public class ClassPathPackagedProgramRetriever implements PackagedProgramRetriever { +public class ClassPathPackagedProgramRetriever extends AbstractPackagedProgramRetriever { private static final Logger LOG = LoggerFactory.getLogger(ClassPathPackagedProgramRetriever.class); - /** User classpaths in relative form to the working directory. */ - @Nonnull private final Collection<URL> userClassPaths; - - @Nonnull private final String[] programArguments; + @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath; @Nullable private final String jobClassName; - @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath; - @Nullable private final File userLibDirectory; - @Nullable private final File jarFile; - - private ClassPathPackagedProgramRetriever( + protected ClassPathPackagedProgramRetriever( @Nonnull String[] programArguments, + @Nonnull Configuration configuration, + @Nullable Supplier<Iterable<File>> jarsOnClassPath, @Nullable String jobClassName, - @Nonnull Supplier<Iterable<File>> jarsOnClassPath, - @Nullable File userLibDirectory, - @Nullable File jarFile) + @Nullable File userLibDirectory) Review comment: I'd propose sticking the a consistent order for all the different implementations listing the `AbstractPackagedProgramRetriever` parameters first followed by the subclass-specific parameters. ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramRetrieverAdapter.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.client.program.PackagedProgramRetriever; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.configuration.Configuration; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +/** Adapter to provide a {@link Builder} to build a {@link PackagedProgramRetriever}. */ +public final class PackagedProgramRetrieverAdapter { + + public static Builder newBuilder(String[] programArguments) { + return new Builder(programArguments); + } + + /** A builder for the {@link PackagedProgramRetriever}. */ + public static class Builder { Review comment: Do we need to use a `Builder` pattern here? I feel like having two constructors (w/ and w/o `jarFile`) instead is good enough ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramRetriever; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.FunctionUtils; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever Review comment: ```suggestion * An abstract {@link ``` ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramRetrieverAdapter.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.client.program.PackagedProgramRetriever; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.configuration.Configuration; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +/** Adapter to provide a {@link Builder} to build a {@link PackagedProgramRetriever}. */ +public final class PackagedProgramRetrieverAdapter { + + public static Builder newBuilder(String[] programArguments) { + return new Builder(programArguments); + } + + /** A builder for the {@link PackagedProgramRetriever}. */ + public static class Builder { + + private final String[] programArguments; + + private Configuration configuration = new Configuration(); Review comment: ```suggestion private Configuration configuration; ``` I'm concerned that someone forgets to set the configuration when using this class. This wouldn't lead to failing fast but resulting in (maybe) unwanted behavior. Instead, we should let it be passed as part of the `Builder` constructor. ########## File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java ########## @@ -285,25 +148,55 @@ public void testJobGraphRetrievalFailIfDoesNotFindTheEntryClassInTheJobDir() } @Test - public void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass() + public void testSavepointRestoreSettings() + throws FlinkException, IOException, ProgramInvocationException { + final Configuration configuration = new Configuration(); + final SavepointRestoreSettings savepointRestoreSettings = + SavepointRestoreSettings.forPath("foobar", true); + final JobID jobId = new JobID(); + + configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); + SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, configuration); + + final PackagedProgramRetriever retrieverUnderTest = + newBuilder(PROGRAM_ARGUMENTS) + .setJobClassName(TestJob.class.getCanonicalName()) + .build(); + + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, configuration); + + assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings))); + assertEquals(jobGraph.getJobID(), jobId); + } + + @Test + public void testJarFromClassPathSupplierSanityCheck() { + Iterable<File> jarFiles = JarsOnClassPath.INSTANCE.get(); + + // Junit executes this test, so it should be returned as part of JARs on the class path + assertThat(jarFiles, hasItem(hasProperty("name", containsString("junit")))); + } + + @Test + public void testRetrieveCorrectUserClassPathsWithoutSpecifiedEntryClass() throws IOException, FlinkException, ProgramInvocationException { - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) + final PackagedProgramRetriever retrieverUnderTest = + newBuilder(PROGRAM_ARGUMENTS) .setJarsOnClassPath(Collections::emptyList) .setUserLibDirectory(userDirHasEntryClass) .build(); final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); assertThat( jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()), - containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray())); + containsInAnyOrder(EXPECTED_URLS.stream().map(URL::toString).toArray())); } @Test - public void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass() + public void testRetrieveCorrectUserClassPathsWithSpecifiedEntryClass() Review comment: Correct me if I'm wrong here but don't we want to move this into `PackaedProgramRetrieverTestBase` as well? ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramRetriever; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.FunctionUtils; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} containing the user's {@code + * main()} from a class on the class path. + */ +@Internal +public abstract class AbstractPackagedProgramRetriever implements PackagedProgramRetriever { + + @Nonnull protected final String[] programArguments; + + @Nonnull protected final Configuration configuration; + + /** User class paths in relative form to the working directory. */ + @Nonnull protected final Collection<URL> userClassPaths; + + AbstractPackagedProgramRetriever( + @Nonnull String[] programArguments, + @Nonnull Configuration configuration, + @Nullable File userLibDirectory) + throws IOException { + this.programArguments = requireNonNull(programArguments, "programArguments"); + this.configuration = requireNonNull(configuration); + this.userClassPaths = discoverUserClassPaths(userLibDirectory); + } + + private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) throws IOException { + if (jobDir == null) { + return Collections.emptyList(); + } + + final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); + final Collection<URL> relativeJarURLs = + FileUtils.listFilesInDirectory(jobDir.toPath(), FileUtils::isJarFile).stream() + .map(path -> FileUtils.relativizePath(workingDirectory, path)) + .map(FunctionUtils.uncheckedFunction(FileUtils::toURL)) + .collect(Collectors.toList()); + return Collections.unmodifiableCollection(relativeJarURLs); + } + + @Override + public PackagedProgram getPackagedProgram() throws FlinkException { + try { + return buildPackagedProgram(); Review comment: Alternatively, we could even get rid of all the members and instantiate the builder already in the constructor... ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramRetriever; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.FunctionUtils; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever + * PackagedProgramRetriever} which creates the {@link + * org.apache.flink.client.program.PackagedProgram PackagedProgram} containing the user's {@code + * main()} from a class on the class path. + */ +@Internal +public abstract class AbstractPackagedProgramRetriever implements PackagedProgramRetriever { + + @Nonnull protected final String[] programArguments; + + @Nonnull protected final Configuration configuration; + + /** User class paths in relative form to the working directory. */ + @Nonnull protected final Collection<URL> userClassPaths; + + AbstractPackagedProgramRetriever( + @Nonnull String[] programArguments, + @Nonnull Configuration configuration, + @Nullable File userLibDirectory) + throws IOException { + this.programArguments = requireNonNull(programArguments, "programArguments"); + this.configuration = requireNonNull(configuration); + this.userClassPaths = discoverUserClassPaths(userLibDirectory); + } + + private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) throws IOException { Review comment: ```suggestion private static Collection<URL> discoverUserClassPaths(@Nullable File jobDir) throws IOException { ``` nit ########## File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramRetrieverAdapterTest.java ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.client.program.PackagedProgramRetriever; +import org.apache.flink.client.testjar.TestJob; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.apache.flink.client.deployment.application.PackagedProgramRetrieverAdapter.newBuilder; +import static org.junit.Assert.assertTrue; + +/** Tests for the {@link PackagedProgramRetrieverAdapter}. */ +public class PackagedProgramRetrieverAdapterTest extends TestLogger { + + @Test + public void testBuildPackagedProgramRetriever() throws IOException { + final String[] programArguments = {"--arg", "suffix"}; + PackagedProgramRetriever retrieverUnderTest = + newBuilder(programArguments) + .setJobClassName(TestJob.class.getCanonicalName()) + .build(); + assertTrue(retrieverUnderTest instanceof ClassPathPackagedProgramRetriever); Review comment: One additional remark: I would suggest also testing the parameters and not only whether the right class is selected for instantation. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
