XComp commented on a change in pull request #15020:
URL: https://github.com/apache/flink/pull/15020#discussion_r590312576



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramRetriever;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.FunctionUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} containing 
the user's {@code
+ * main()} from a class on the class path.
+ */
+@Internal
+public abstract class AbstractPackagedProgramRetriever implements 
PackagedProgramRetriever {
+
+    @Nonnull protected final String[] programArguments;
+
+    @Nonnull protected final Configuration configuration;
+
+    /** User class paths in relative form to the working directory. */
+    @Nonnull protected final Collection<URL> userClassPaths;
+
+    AbstractPackagedProgramRetriever(
+            @Nonnull String[] programArguments,
+            @Nonnull Configuration configuration,
+            @Nullable File userLibDirectory)
+            throws IOException {
+        this.programArguments = requireNonNull(programArguments, 
"programArguments");
+        this.configuration = requireNonNull(configuration);
+        this.userClassPaths = discoverUserClassPaths(userLibDirectory);
+    }
+
+    private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) 
throws IOException {
+        if (jobDir == null) {
+            return Collections.emptyList();
+        }
+
+        final Path workingDirectory = FileUtils.getCurrentWorkingDirectory();
+        final Collection<URL> relativeJarURLs =
+                FileUtils.listFilesInDirectory(jobDir.toPath(), 
FileUtils::isJarFile).stream()
+                        .map(path -> 
FileUtils.relativizePath(workingDirectory, path))
+                        .map(FunctionUtils.uncheckedFunction(FileUtils::toURL))
+                        .collect(Collectors.toList());
+        return Collections.unmodifiableCollection(relativeJarURLs);
+    }
+
+    @Override
+    public PackagedProgram getPackagedProgram() throws FlinkException {
+        try {
+            return buildPackagedProgram();
+        } catch (ProgramInvocationException e) {
+            throw new FlinkException("Could not load the provided entry point 
class.", e);
+        }
+    }
+
+    public abstract PackagedProgram buildPackagedProgram()

Review comment:
       ```suggestion
       protected abstract PackagedProgram buildPackagedProgram()
   ```
   This method can be protected as far as I'm concerned.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/JarFilePackagedProgramRetriever.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * A jar file {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} with the 
specified jar file.
+ */
+@Internal
+public class JarFilePackagedProgramRetriever extends 
AbstractPackagedProgramRetriever {
+
+    @Nullable private final String jobClassName;
+
+    @Nullable private final File jarFile;
+
+    protected JarFilePackagedProgramRetriever(
+            @Nonnull String[] programArguments,
+            @Nonnull Configuration configuration,
+            @Nullable String jobClassName,
+            @Nullable File userLibDirectory,
+            @Nullable File jarFile)
+            throws IOException {
+        super(programArguments, configuration, userLibDirectory);
+        this.jobClassName = jobClassName;
+        this.jarFile = jarFile;
+    }
+
+    @Override
+    public PackagedProgram buildPackagedProgram()
+            throws ProgramInvocationException, FlinkException {

Review comment:
       `FlinkException` is never thrown

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramRetrieverAdapter.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.client.program.PackagedProgramRetriever;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+/** Adapter to provide a {@link Builder} to build a {@link 
PackagedProgramRetriever}. */
+public final class PackagedProgramRetrieverAdapter {

Review comment:
       The `PackagedProgramRetrieverAdapter` does not fulfil any purpose here, 
does it? We could move the `Builder` to become a first-level class instead of 
an inner class. Additionally, I feel like the `Builder` should be renamed into 
something like a `PackagedProgramRetrieverFactory` as it instantiated objects 
from different kind of subclasses.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
##########
@@ -38,109 +36,56 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.NoSuchElementException;
 import java.util.function.Supplier;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static java.util.Objects.requireNonNull;
-
 /**
- * A {@link org.apache.flink.client.program.PackagedProgramRetriever 
PackagedProgramRetriever} which
- * creates the {@link org.apache.flink.client.program.PackagedProgram 
PackagedProgram} containing
- * the user's {@code main()} from a class on the class path.
+ * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever

Review comment:
       nit: Could we import `PackagedProgramRetriever` to improve readability?

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
##########
@@ -38,109 +36,56 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.NoSuchElementException;
 import java.util.function.Supplier;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static java.util.Objects.requireNonNull;
-
 /**
- * A {@link org.apache.flink.client.program.PackagedProgramRetriever 
PackagedProgramRetriever} which
- * creates the {@link org.apache.flink.client.program.PackagedProgram 
PackagedProgram} containing
- * the user's {@code main()} from a class on the class path.
+ * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} through 
scanning the classpath

Review comment:
       ```suggestion
    * PackagedProgram} through scanning the classpath
   ```
   We don't need the package information here.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
##########
@@ -312,87 +205,64 @@ public void 
testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass()
 
         assertThat(
                 
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()),
-                
containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray()));
+                
containsInAnyOrder(EXPECTED_URLS.stream().map(URL::toString).toArray()));
     }
 
     @Test
-    public void testRetrieveFromJarFileWithoutUserLib()
-            throws IOException, FlinkException, ProgramInvocationException {
-        final File testJar = TestJob.getTestJobJar();
-        final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
-                        .setJarFile(testJar)
-                        .build();
-        final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new 
Configuration());
+    public void testGetPackagedProgramWithConfiguration() throws IOException, 
FlinkException {
+        final Configuration configuration = new Configuration();
+        configuration.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, 
"parent-first");
+        configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
 
-        assertThat(
-                jobGraph.getUserJars(),
-                containsInAnyOrder(new 
org.apache.flink.core.fs.Path(testJar.toURI())));
-        assertThat(jobGraph.getClasspaths().isEmpty(), is(true));
+        final PackagedProgramRetriever retrieverUnderTest =
+                newBuilder(PROGRAM_ARGUMENTS)
+                        .setConfiguration(configuration)
+                        .setJobClassName(TestJob.class.getCanonicalName())
+                        .build();
+        assertTrue(
+                
retrieverUnderTest.getPackagedProgram().getUserCodeClassLoader()
+                        instanceof 
FlinkUserCodeClassLoaders.ParentFirstClassLoader);
     }
 
     @Test
-    public void testRetrieveFromJarFileWithUserLib()
-            throws IOException, FlinkException, ProgramInvocationException {
-        final File testJar = TestJob.getTestJobJar();
-        final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
-                        .setJarFile(testJar)
-                        .setUserLibDirectory(userDirHasEntryClass)
-                        .build();
-        final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new 
Configuration());
+    public void testJarFromClassPathSupplier() throws IOException {
+        final File file1 = temporaryFolder.newFile();
+        final File file2 = temporaryFolder.newFile();
+        final File directory = temporaryFolder.newFolder();
 
-        assertThat(
-                jobGraph.getUserJars(),
-                containsInAnyOrder(new 
org.apache.flink.core.fs.Path(testJar.toURI())));
-        assertThat(
-                
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()),
-                
containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray()));
-    }
+        // Mock java.class.path property. The empty strings are important as 
the shell scripts
+        // that prepare the Flink class path often have such entries.
+        final String classPath =
+                javaClassPath(
+                        "",
+                        "",
+                        "",
+                        file1.getAbsolutePath(),
+                        "",
+                        directory.getAbsolutePath(),
+                        "",
+                        file2.getAbsolutePath(),
+                        "",
+                        "");
+
+        Iterable<File> jarFiles = setClassPathAndGetJarsOnClassPath(classPath);
 
-    private JobGraph retrieveJobGraph(
-            ClassPathPackagedProgramRetriever retrieverUnderTest, 
Configuration configuration)
-            throws FlinkException, ProgramInvocationException, 
MalformedURLException {
-        final PackagedProgram packagedProgram = 
retrieverUnderTest.getPackagedProgram();
-
-        final int defaultParallelism = 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-        ConfigUtils.encodeCollectionToConfig(
-                configuration,
-                PipelineOptions.JARS,
-                packagedProgram.getJobJarAndDependencies(),
-                URL::toString);
-        ConfigUtils.encodeCollectionToConfig(
-                configuration,
-                PipelineOptions.CLASSPATHS,
-                packagedProgram.getClasspaths(),
-                URL::toString);
-
-        final Pipeline pipeline =
-                PackagedProgramUtils.getPipelineFromProgram(
-                        packagedProgram, configuration, defaultParallelism, 
false);
-        return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+        assertThat(jarFiles, contains(file1, file2));
     }
 
     private static String javaClassPath(String... entries) {
-        String pathSeparator =
-                System.getProperty(
-                        
ClassPathPackagedProgramRetriever.JarsOnClassPath.PATH_SEPARATOR);
+        String pathSeparator = System.getProperty(PATH_SEPARATOR);
         return String.join(pathSeparator, entries);
     }
 
     private static Iterable<File> setClassPathAndGetJarsOnClassPath(String 
classPath) {
-        final String originalClassPath =
-                System.getProperty(
-                        
ClassPathPackagedProgramRetriever.JarsOnClassPath.JAVA_CLASS_PATH);
+        final String originalClassPath = System.getProperty(JAVA_CLASS_PATH);
         try {
-            System.setProperty(
-                    
ClassPathPackagedProgramRetriever.JarsOnClassPath.JAVA_CLASS_PATH, classPath);
-            return 
ClassPathPackagedProgramRetriever.JarsOnClassPath.INSTANCE.get();
+            System.setProperty(JAVA_CLASS_PATH, classPath);

Review comment:
       ```suggestion
               System.setProperty(JAVA_CLASSPATH, classpath);
   ```
   It's just a minor thing aligning classpath with the way Java formats it. It 
applies to the related lines as well. You might want to put it into a separate 
commit if you decide to do this.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
##########
@@ -38,109 +36,56 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.NoSuchElementException;
 import java.util.function.Supplier;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static java.util.Objects.requireNonNull;
-
 /**
- * A {@link org.apache.flink.client.program.PackagedProgramRetriever 
PackagedProgramRetriever} which
- * creates the {@link org.apache.flink.client.program.PackagedProgram 
PackagedProgram} containing
- * the user's {@code main()} from a class on the class path.
+ * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} through 
scanning the classpath
+ * for the job class.
  */
 @Internal
-public class ClassPathPackagedProgramRetriever implements 
PackagedProgramRetriever {
+public class ClassPathPackagedProgramRetriever extends 
AbstractPackagedProgramRetriever {

Review comment:
       ```suggestion
   public class ClasspathPackagedProgramRetriever extends 
AbstractPackagedProgramRetriever {
   ```
   I'd suggest sticking to the formatting Java suggests for the classpath as 
being one word

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramRetriever;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.FunctionUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} containing 
the user's {@code
+ * main()} from a class on the class path.
+ */
+@Internal
+public abstract class AbstractPackagedProgramRetriever implements 
PackagedProgramRetriever {
+
+    @Nonnull protected final String[] programArguments;
+
+    @Nonnull protected final Configuration configuration;
+
+    /** User class paths in relative form to the working directory. */
+    @Nonnull protected final Collection<URL> userClassPaths;
+
+    AbstractPackagedProgramRetriever(
+            @Nonnull String[] programArguments,
+            @Nonnull Configuration configuration,
+            @Nullable File userLibDirectory)
+            throws IOException {
+        this.programArguments = requireNonNull(programArguments, 
"programArguments");
+        this.configuration = requireNonNull(configuration);
+        this.userClassPaths = discoverUserClassPaths(userLibDirectory);
+    }
+
+    private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) 
throws IOException {
+        if (jobDir == null) {
+            return Collections.emptyList();
+        }
+
+        final Path workingDirectory = FileUtils.getCurrentWorkingDirectory();
+        final Collection<URL> relativeJarURLs =
+                FileUtils.listFilesInDirectory(jobDir.toPath(), 
FileUtils::isJarFile).stream()
+                        .map(path -> 
FileUtils.relativizePath(workingDirectory, path))
+                        .map(FunctionUtils.uncheckedFunction(FileUtils::toURL))
+                        .collect(Collectors.toList());
+        return Collections.unmodifiableCollection(relativeJarURLs);
+    }
+
+    @Override
+    public PackagedProgram getPackagedProgram() throws FlinkException {
+        try {
+            return buildPackagedProgram();

Review comment:
       ```suggestion
               final PackagedProgram.Builder packagedProgramBuilder =
                       PackagedProgram.newBuilder()
                               .setConfiguration(configuration)
                               .setArguments(programArguments)
                               .setUserClassPaths(userClassPaths);
               return buildPackagedProgram(packagedProgramBuilder);
   ```
   We could move the Builder creation into `AbstractPackagedProgramRetriever`. 
This way, we could make the general members `userClassPaths`, `configuration`, 
and `programArguments` private.
   
   A question I have, is: What's the reason for creating a copy of 
`userClassPaths`? I didn't find a necessity to do so in the code 🤔 

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
##########
@@ -38,109 +36,56 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.NoSuchElementException;
 import java.util.function.Supplier;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static java.util.Objects.requireNonNull;
-
 /**
- * A {@link org.apache.flink.client.program.PackagedProgramRetriever 
PackagedProgramRetriever} which
- * creates the {@link org.apache.flink.client.program.PackagedProgram 
PackagedProgram} containing
- * the user's {@code main()} from a class on the class path.
+ * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} through 
scanning the classpath
+ * for the job class.
  */
 @Internal
-public class ClassPathPackagedProgramRetriever implements 
PackagedProgramRetriever {
+public class ClassPathPackagedProgramRetriever extends 
AbstractPackagedProgramRetriever {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(ClassPathPackagedProgramRetriever.class);
 
-    /** User classpaths in relative form to the working directory. */
-    @Nonnull private final Collection<URL> userClassPaths;
-
-    @Nonnull private final String[] programArguments;
+    @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath;
 
     @Nullable private final String jobClassName;
 
-    @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath;
-
     @Nullable private final File userLibDirectory;
 
-    @Nullable private final File jarFile;
-
-    private ClassPathPackagedProgramRetriever(
+    protected ClassPathPackagedProgramRetriever(
             @Nonnull String[] programArguments,
+            @Nonnull Configuration configuration,
+            @Nullable Supplier<Iterable<File>> jarsOnClassPath,
             @Nullable String jobClassName,
-            @Nonnull Supplier<Iterable<File>> jarsOnClassPath,
-            @Nullable File userLibDirectory,
-            @Nullable File jarFile)
+            @Nullable File userLibDirectory)
             throws IOException {
-        this.userLibDirectory = userLibDirectory;
-        this.programArguments = requireNonNull(programArguments, 
"programArguments");
+        super(programArguments, configuration, userLibDirectory);
+        this.jarsOnClassPath = jarsOnClassPath == null ? 
JarsOnClassPath.INSTANCE : jarsOnClassPath;
         this.jobClassName = jobClassName;
-        this.jarsOnClassPath = requireNonNull(jarsOnClassPath);
-        this.userClassPaths = discoverUserClassPaths(userLibDirectory);
-        this.jarFile = jarFile;
-    }
-
-    private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) 
throws IOException {
-        if (jobDir == null) {
-            return Collections.emptyList();
-        }
-
-        final Path workingDirectory = FileUtils.getCurrentWorkingDirectory();
-        final Collection<URL> relativeJarURLs =
-                FileUtils.listFilesInDirectory(jobDir.toPath(), 
FileUtils::isJarFile).stream()
-                        .map(path -> 
FileUtils.relativizePath(workingDirectory, path))
-                        .map(FunctionUtils.uncheckedFunction(FileUtils::toURL))
-                        .collect(Collectors.toList());
-        return Collections.unmodifiableCollection(relativeJarURLs);
+        this.userLibDirectory = userLibDirectory;
     }
 
     @Override
-    public PackagedProgram getPackagedProgram() throws FlinkException {
-        try {
-            // It is Python job if program arguments contain "-py"/--python" 
or "-pym/--pyModule",
-            // set the fixed
-            // jobClassName and jarFile path.
-            if (PackagedProgramUtils.isPython(jobClassName)
-                    || PackagedProgramUtils.isPython(programArguments)) {
-                String pythonJobClassName = 
PackagedProgramUtils.getPythonDriverClassName();
-                File pythonJarFile = new 
File(PackagedProgramUtils.getPythonJar().getPath());
-                return PackagedProgram.newBuilder()
-                        .setUserClassPaths(new ArrayList<>(userClassPaths))
-                        .setArguments(programArguments)
-                        .setJarFile(pythonJarFile)
-                        .setEntryPointClassName(pythonJobClassName)
-                        .build();
-            }
-
-            if (jarFile != null) {
-                return PackagedProgram.newBuilder()
-                        .setUserClassPaths(new ArrayList<>(userClassPaths))
-                        .setArguments(programArguments)
-                        .setJarFile(jarFile)
-                        .setEntryPointClassName(jobClassName)
-                        .build();
-            }
-
-            final String entryClass = getJobClassNameOrScanClassPath();
-            return PackagedProgram.newBuilder()
-                    .setUserClassPaths(new ArrayList<>(userClassPaths))
-                    .setEntryPointClassName(entryClass)
-                    .setArguments(programArguments)
-                    .build();
-        } catch (ProgramInvocationException e) {
-            throw new FlinkException("Could not load the provided entrypoint 
class.", e);
-        }
+    public PackagedProgram buildPackagedProgram()
+            throws ProgramInvocationException, FlinkException {
+        final String entryClass = getJobClassNameOrScanClassPath();
+        return PackagedProgram.newBuilder()
+                .setArguments(programArguments)
+                .setConfiguration(configuration)
+                .setUserClassPaths(new ArrayList<>(userClassPaths))
+                .setEntryPointClassName(entryClass)
+                .build();
     }
 
     private String getJobClassNameOrScanClassPath() throws FlinkException {

Review comment:
       Legacy: I'd suggest cleaning up the jumping between parameter passing 
and accessing members here. I guess, we could make all three methods static and 
pass the members as parameters into the methods.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
##########
@@ -38,109 +36,56 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.NoSuchElementException;
 import java.util.function.Supplier;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static java.util.Objects.requireNonNull;
-
 /**
- * A {@link org.apache.flink.client.program.PackagedProgramRetriever 
PackagedProgramRetriever} which
- * creates the {@link org.apache.flink.client.program.PackagedProgram 
PackagedProgram} containing
- * the user's {@code main()} from a class on the class path.
+ * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} through 
scanning the classpath
+ * for the job class.
  */
 @Internal
-public class ClassPathPackagedProgramRetriever implements 
PackagedProgramRetriever {
+public class ClassPathPackagedProgramRetriever extends 
AbstractPackagedProgramRetriever {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(ClassPathPackagedProgramRetriever.class);
 
-    /** User classpaths in relative form to the working directory. */
-    @Nonnull private final Collection<URL> userClassPaths;
-
-    @Nonnull private final String[] programArguments;
+    @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath;
 
     @Nullable private final String jobClassName;
 
-    @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath;
-
     @Nullable private final File userLibDirectory;
 
-    @Nullable private final File jarFile;
-
-    private ClassPathPackagedProgramRetriever(
+    protected ClassPathPackagedProgramRetriever(
             @Nonnull String[] programArguments,
+            @Nonnull Configuration configuration,
+            @Nullable Supplier<Iterable<File>> jarsOnClassPath,
             @Nullable String jobClassName,
-            @Nonnull Supplier<Iterable<File>> jarsOnClassPath,
-            @Nullable File userLibDirectory,
-            @Nullable File jarFile)
+            @Nullable File userLibDirectory)
             throws IOException {
-        this.userLibDirectory = userLibDirectory;
-        this.programArguments = requireNonNull(programArguments, 
"programArguments");
+        super(programArguments, configuration, userLibDirectory);
+        this.jarsOnClassPath = jarsOnClassPath == null ? 
JarsOnClassPath.INSTANCE : jarsOnClassPath;
         this.jobClassName = jobClassName;
-        this.jarsOnClassPath = requireNonNull(jarsOnClassPath);
-        this.userClassPaths = discoverUserClassPaths(userLibDirectory);
-        this.jarFile = jarFile;
-    }
-
-    private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) 
throws IOException {
-        if (jobDir == null) {
-            return Collections.emptyList();
-        }
-
-        final Path workingDirectory = FileUtils.getCurrentWorkingDirectory();
-        final Collection<URL> relativeJarURLs =
-                FileUtils.listFilesInDirectory(jobDir.toPath(), 
FileUtils::isJarFile).stream()
-                        .map(path -> 
FileUtils.relativizePath(workingDirectory, path))
-                        .map(FunctionUtils.uncheckedFunction(FileUtils::toURL))
-                        .collect(Collectors.toList());
-        return Collections.unmodifiableCollection(relativeJarURLs);
+        this.userLibDirectory = userLibDirectory;
     }
 
     @Override
-    public PackagedProgram getPackagedProgram() throws FlinkException {
-        try {
-            // It is Python job if program arguments contain "-py"/--python" 
or "-pym/--pyModule",
-            // set the fixed
-            // jobClassName and jarFile path.
-            if (PackagedProgramUtils.isPython(jobClassName)
-                    || PackagedProgramUtils.isPython(programArguments)) {
-                String pythonJobClassName = 
PackagedProgramUtils.getPythonDriverClassName();
-                File pythonJarFile = new 
File(PackagedProgramUtils.getPythonJar().getPath());
-                return PackagedProgram.newBuilder()
-                        .setUserClassPaths(new ArrayList<>(userClassPaths))
-                        .setArguments(programArguments)
-                        .setJarFile(pythonJarFile)
-                        .setEntryPointClassName(pythonJobClassName)
-                        .build();
-            }
-
-            if (jarFile != null) {
-                return PackagedProgram.newBuilder()
-                        .setUserClassPaths(new ArrayList<>(userClassPaths))
-                        .setArguments(programArguments)
-                        .setJarFile(jarFile)
-                        .setEntryPointClassName(jobClassName)
-                        .build();
-            }
-
-            final String entryClass = getJobClassNameOrScanClassPath();
-            return PackagedProgram.newBuilder()
-                    .setUserClassPaths(new ArrayList<>(userClassPaths))
-                    .setEntryPointClassName(entryClass)
-                    .setArguments(programArguments)
-                    .build();
-        } catch (ProgramInvocationException e) {
-            throw new FlinkException("Could not load the provided entrypoint 
class.", e);
-        }
+    public PackagedProgram buildPackagedProgram()
+            throws ProgramInvocationException, FlinkException {
+        final String entryClass = getJobClassNameOrScanClassPath();
+        return PackagedProgram.newBuilder()
+                .setArguments(programArguments)
+                .setConfiguration(configuration)
+                .setUserClassPaths(new ArrayList<>(userClassPaths))
+                .setEntryPointClassName(entryClass)
+                .build();
     }
 
     private String getJobClassNameOrScanClassPath() throws FlinkException {

Review comment:
       Legacy: Could we replace the nested `if`'s by logical `&&`? That would 
improve the readability, I guess.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/PythonBasedPackagedProgramRetriever.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * A python based {@link 
org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} when 
program arguments contain
+ * "-py/--python" or "-pym/--pyModule".
+ */
+@Internal
+public class PythonBasedPackagedProgramRetriever extends 
AbstractPackagedProgramRetriever {
+
+    protected PythonBasedPackagedProgramRetriever(

Review comment:
       ```suggestion
       PythonBasedPackagedProgramRetriever(
   ```

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/PythonBasedPackagedProgramRetriever.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * A python based {@link 
org.apache.flink.client.program.PackagedProgramRetriever

Review comment:
       ```suggestion
    * A python based {@link PackagedProgramRetriever
   ```
   nit: Import class to make the JavaDoc more readable.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
##########
@@ -38,109 +36,56 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.NoSuchElementException;
 import java.util.function.Supplier;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static java.util.Objects.requireNonNull;
-
 /**
- * A {@link org.apache.flink.client.program.PackagedProgramRetriever 
PackagedProgramRetriever} which
- * creates the {@link org.apache.flink.client.program.PackagedProgram 
PackagedProgram} containing
- * the user's {@code main()} from a class on the class path.
+ * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} through 
scanning the classpath
+ * for the job class.
  */
 @Internal
-public class ClassPathPackagedProgramRetriever implements 
PackagedProgramRetriever {
+public class ClassPathPackagedProgramRetriever extends 
AbstractPackagedProgramRetriever {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(ClassPathPackagedProgramRetriever.class);
 
-    /** User classpaths in relative form to the working directory. */
-    @Nonnull private final Collection<URL> userClassPaths;
-
-    @Nonnull private final String[] programArguments;
+    @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath;
 
     @Nullable private final String jobClassName;
 
-    @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath;
-
     @Nullable private final File userLibDirectory;
 
-    @Nullable private final File jarFile;
-
-    private ClassPathPackagedProgramRetriever(
+    protected ClassPathPackagedProgramRetriever(

Review comment:
       ```suggestion
      ClassPathPackagedProgramRetriever(
   ```
   Only `PackagedProgramRetrieverAdapter` needs to access it.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramRetriever;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.FunctionUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} containing 
the user's {@code

Review comment:
       ```suggestion
    * PackagedProgram} containing the user's {@code
   ```

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
##########
@@ -38,109 +36,56 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.NoSuchElementException;
 import java.util.function.Supplier;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static java.util.Objects.requireNonNull;
-
 /**
- * A {@link org.apache.flink.client.program.PackagedProgramRetriever 
PackagedProgramRetriever} which
- * creates the {@link org.apache.flink.client.program.PackagedProgram 
PackagedProgram} containing
- * the user's {@code main()} from a class on the class path.
+ * A classpath {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} through 
scanning the classpath
+ * for the job class.
  */
 @Internal
-public class ClassPathPackagedProgramRetriever implements 
PackagedProgramRetriever {
+public class ClassPathPackagedProgramRetriever extends 
AbstractPackagedProgramRetriever {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(ClassPathPackagedProgramRetriever.class);
 
-    /** User classpaths in relative form to the working directory. */
-    @Nonnull private final Collection<URL> userClassPaths;
-
-    @Nonnull private final String[] programArguments;
+    @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath;
 
     @Nullable private final String jobClassName;
 
-    @Nonnull private final Supplier<Iterable<File>> jarsOnClassPath;
-
     @Nullable private final File userLibDirectory;
 
-    @Nullable private final File jarFile;
-
-    private ClassPathPackagedProgramRetriever(
+    protected ClassPathPackagedProgramRetriever(
             @Nonnull String[] programArguments,
+            @Nonnull Configuration configuration,
+            @Nullable Supplier<Iterable<File>> jarsOnClassPath,
             @Nullable String jobClassName,
-            @Nonnull Supplier<Iterable<File>> jarsOnClassPath,
-            @Nullable File userLibDirectory,
-            @Nullable File jarFile)
+            @Nullable File userLibDirectory)

Review comment:
       I'd propose sticking the a consistent order for all the different 
implementations listing the `AbstractPackagedProgramRetriever` parameters first 
followed by the subclass-specific parameters.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramRetrieverAdapter.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.client.program.PackagedProgramRetriever;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+/** Adapter to provide a {@link Builder} to build a {@link 
PackagedProgramRetriever}. */
+public final class PackagedProgramRetrieverAdapter {
+
+    public static Builder newBuilder(String[] programArguments) {
+        return new Builder(programArguments);
+    }
+
+    /** A builder for the {@link PackagedProgramRetriever}. */
+    public static class Builder {

Review comment:
       Do we need to use a `Builder` pattern here? I feel like having two 
constructors (w/ and w/o `jarFile`) instead is good enough

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramRetriever;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.FunctionUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever

Review comment:
       ```suggestion
    * An abstract {@link 
   ```

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramRetrieverAdapter.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.client.program.PackagedProgramRetriever;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+/** Adapter to provide a {@link Builder} to build a {@link 
PackagedProgramRetriever}. */
+public final class PackagedProgramRetrieverAdapter {
+
+    public static Builder newBuilder(String[] programArguments) {
+        return new Builder(programArguments);
+    }
+
+    /** A builder for the {@link PackagedProgramRetriever}. */
+    public static class Builder {
+
+        private final String[] programArguments;
+
+        private Configuration configuration = new Configuration();

Review comment:
       ```suggestion
           private Configuration configuration;
   ```
   I'm concerned that someone forgets to set the configuration when using this 
class. This wouldn't lead to failing fast but resulting in (maybe) unwanted 
behavior. Instead, we should let it be passed as part of the `Builder` 
constructor.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
##########
@@ -285,25 +148,55 @@ public void 
testJobGraphRetrievalFailIfDoesNotFindTheEntryClassInTheJobDir()
     }
 
     @Test
-    public void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass()
+    public void testSavepointRestoreSettings()
+            throws FlinkException, IOException, ProgramInvocationException {
+        final Configuration configuration = new Configuration();
+        final SavepointRestoreSettings savepointRestoreSettings =
+                SavepointRestoreSettings.forPath("foobar", true);
+        final JobID jobId = new JobID();
+
+        configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
jobId.toHexString());
+        SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, 
configuration);
+
+        final PackagedProgramRetriever retrieverUnderTest =
+                newBuilder(PROGRAM_ARGUMENTS)
+                        .setJobClassName(TestJob.class.getCanonicalName())
+                        .build();
+
+        final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, 
configuration);
+
+        assertThat(jobGraph.getSavepointRestoreSettings(), 
is(equalTo(savepointRestoreSettings)));
+        assertEquals(jobGraph.getJobID(), jobId);
+    }
+
+    @Test
+    public void testJarFromClassPathSupplierSanityCheck() {
+        Iterable<File> jarFiles = JarsOnClassPath.INSTANCE.get();
+
+        // Junit executes this test, so it should be returned as part of JARs 
on the class path
+        assertThat(jarFiles, hasItem(hasProperty("name", 
containsString("junit"))));
+    }
+
+    @Test
+    public void testRetrieveCorrectUserClassPathsWithoutSpecifiedEntryClass()
             throws IOException, FlinkException, ProgramInvocationException {
-        final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+        final PackagedProgramRetriever retrieverUnderTest =
+                newBuilder(PROGRAM_ARGUMENTS)
                         .setJarsOnClassPath(Collections::emptyList)
                         .setUserLibDirectory(userDirHasEntryClass)
                         .build();
         final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new 
Configuration());
 
         assertThat(
                 
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()),
-                
containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray()));
+                
containsInAnyOrder(EXPECTED_URLS.stream().map(URL::toString).toArray()));
     }
 
     @Test
-    public void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass()
+    public void testRetrieveCorrectUserClassPathsWithSpecifiedEntryClass()

Review comment:
       Correct me if I'm wrong here but don't we want to move this into 
`PackaedProgramRetrieverTestBase` as well?

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramRetriever;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.FunctionUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} containing 
the user's {@code
+ * main()} from a class on the class path.
+ */
+@Internal
+public abstract class AbstractPackagedProgramRetriever implements 
PackagedProgramRetriever {
+
+    @Nonnull protected final String[] programArguments;
+
+    @Nonnull protected final Configuration configuration;
+
+    /** User class paths in relative form to the working directory. */
+    @Nonnull protected final Collection<URL> userClassPaths;
+
+    AbstractPackagedProgramRetriever(
+            @Nonnull String[] programArguments,
+            @Nonnull Configuration configuration,
+            @Nullable File userLibDirectory)
+            throws IOException {
+        this.programArguments = requireNonNull(programArguments, 
"programArguments");
+        this.configuration = requireNonNull(configuration);
+        this.userClassPaths = discoverUserClassPaths(userLibDirectory);
+    }
+
+    private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) 
throws IOException {
+        if (jobDir == null) {
+            return Collections.emptyList();
+        }
+
+        final Path workingDirectory = FileUtils.getCurrentWorkingDirectory();
+        final Collection<URL> relativeJarURLs =
+                FileUtils.listFilesInDirectory(jobDir.toPath(), 
FileUtils::isJarFile).stream()
+                        .map(path -> 
FileUtils.relativizePath(workingDirectory, path))
+                        .map(FunctionUtils.uncheckedFunction(FileUtils::toURL))
+                        .collect(Collectors.toList());
+        return Collections.unmodifiableCollection(relativeJarURLs);
+    }
+
+    @Override
+    public PackagedProgram getPackagedProgram() throws FlinkException {
+        try {
+            return buildPackagedProgram();

Review comment:
       Alternatively, we could even get rid of all the members and instantiate 
the builder already in the constructor...

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/AbstractPackagedProgramRetriever.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramRetriever;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.FunctionUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * An abstract {@link org.apache.flink.client.program.PackagedProgramRetriever
+ * PackagedProgramRetriever} which creates the {@link
+ * org.apache.flink.client.program.PackagedProgram PackagedProgram} containing 
the user's {@code
+ * main()} from a class on the class path.
+ */
+@Internal
+public abstract class AbstractPackagedProgramRetriever implements 
PackagedProgramRetriever {
+
+    @Nonnull protected final String[] programArguments;
+
+    @Nonnull protected final Configuration configuration;
+
+    /** User class paths in relative form to the working directory. */
+    @Nonnull protected final Collection<URL> userClassPaths;
+
+    AbstractPackagedProgramRetriever(
+            @Nonnull String[] programArguments,
+            @Nonnull Configuration configuration,
+            @Nullable File userLibDirectory)
+            throws IOException {
+        this.programArguments = requireNonNull(programArguments, 
"programArguments");
+        this.configuration = requireNonNull(configuration);
+        this.userClassPaths = discoverUserClassPaths(userLibDirectory);
+    }
+
+    private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) 
throws IOException {

Review comment:
       ```suggestion
       private static Collection<URL> discoverUserClassPaths(@Nullable File 
jobDir) throws IOException {
   ```
   nit

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramRetrieverAdapterTest.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.client.program.PackagedProgramRetriever;
+import org.apache.flink.client.testjar.TestJob;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static 
org.apache.flink.client.deployment.application.PackagedProgramRetrieverAdapter.newBuilder;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the {@link PackagedProgramRetrieverAdapter}. */
+public class PackagedProgramRetrieverAdapterTest extends TestLogger {
+
+    @Test
+    public void testBuildPackagedProgramRetriever() throws IOException {
+        final String[] programArguments = {"--arg", "suffix"};
+        PackagedProgramRetriever retrieverUnderTest =
+                newBuilder(programArguments)
+                        .setJobClassName(TestJob.class.getCanonicalName())
+                        .build();
+        assertTrue(retrieverUnderTest instanceof 
ClassPathPackagedProgramRetriever);

Review comment:
       One additional remark: I would suggest also testing the parameters and 
not only whether the right class is selected for instantation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to