This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 848d4ba  [FLINK-21445][clients] Adds configuration to 
ClassPathPackagedProgramRetriever
848d4ba is described below

commit 848d4baf7054781de82d731d6ed60bb10d44d8e0
Author: Matthias Pohl <[email protected]>
AuthorDate: Tue Jul 6 09:29:47 2021 +0200

    [FLINK-21445][clients] Adds configuration to 
ClassPathPackagedProgramRetriever
---
 .../ClassPathPackagedProgramRetriever.java         | 25 ++++++--
 .../ClassPathPackagedProgramRetrieverTest.java     | 68 ++++++++++++++++++----
 .../StandaloneApplicationClusterEntryPoint.java    | 17 ++++--
 .../KubernetesApplicationClusterEntrypoint.java    |  2 +-
 .../YarnApplicationClusterEntryPoint.java          |  2 +-
 5 files changed, 91 insertions(+), 23 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
index 65471a0..041d2cf 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramRetriever;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
@@ -76,12 +77,15 @@ public class ClassPathPackagedProgramRetriever implements 
PackagedProgramRetriev
 
     @Nullable private final File jarFile;
 
+    @Nonnull private final Configuration configuration;
+
     private ClassPathPackagedProgramRetriever(
             @Nonnull String[] programArguments,
             @Nullable String jobClassName,
             @Nonnull Supplier<Iterable<File>> jarsOnClassPath,
             @Nullable File userLibDirectory,
-            @Nullable File jarFile)
+            @Nullable File jarFile,
+            @Nonnull Configuration configuration)
             throws IOException {
         this.userLibDirectory = userLibDirectory;
         this.programArguments = requireNonNull(programArguments, 
"programArguments");
@@ -89,6 +93,7 @@ public class ClassPathPackagedProgramRetriever implements 
PackagedProgramRetriev
         this.jarsOnClassPath = requireNonNull(jarsOnClassPath);
         this.userClassPaths = discoverUserClassPaths(userLibDirectory);
         this.jarFile = jarFile;
+        this.configuration = configuration;
     }
 
     private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) 
throws IOException {
@@ -129,6 +134,7 @@ public class ClassPathPackagedProgramRetriever implements 
PackagedProgramRetriev
                         .setArguments(programArguments)
                         .setJarFile(jarFile)
                         .setEntryPointClassName(jobClassName)
+                        .setConfiguration(configuration)
                         .build();
             }
 
@@ -137,6 +143,7 @@ public class ClassPathPackagedProgramRetriever implements 
PackagedProgramRetriev
                     .setUserClassPaths(new ArrayList<>(userClassPaths))
                     .setEntryPointClassName(entryClass)
                     .setArguments(programArguments)
+                    .setConfiguration(configuration)
                     .build();
         } catch (ProgramInvocationException e) {
             throw new FlinkException("Could not load the provided entrypoint 
class.", e);
@@ -252,8 +259,11 @@ public class ClassPathPackagedProgramRetriever implements 
PackagedProgramRetriev
 
         private File jarFile;
 
-        private Builder(String[] programArguments) {
+        private final Configuration configuration;
+
+        private Builder(String[] programArguments, Configuration 
configuration) {
             this.programArguments = requireNonNull(programArguments);
+            this.configuration = requireNonNull(configuration);
         }
 
         public Builder setJobClassName(@Nullable String jobClassName) {
@@ -278,11 +288,16 @@ public class ClassPathPackagedProgramRetriever implements 
PackagedProgramRetriev
 
         public ClassPathPackagedProgramRetriever build() throws IOException {
             return new ClassPathPackagedProgramRetriever(
-                    programArguments, jobClassName, jarsOnClassPath, 
userLibDirectory, jarFile);
+                    programArguments,
+                    jobClassName,
+                    jarsOnClassPath,
+                    userLibDirectory,
+                    jarFile,
+                    configuration);
         }
     }
 
-    public static Builder newBuilder(String[] programArguments) {
-        return new Builder(programArguments);
+    public static Builder newBuilder(String[] programArguments, Configuration 
configuration) {
+        return new Builder(programArguments, configuration);
     }
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
index 291504f..c9d5f88 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
@@ -31,14 +31,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.util.ChildFirstClassLoader;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.FunctionUtils;
 
+import org.hamcrest.core.IsInstanceOf;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -65,6 +68,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -142,7 +146,7 @@ public class ClassPathPackagedProgramRetrieverTest extends 
TestLogger {
         configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
jobId.toHexString());
 
         final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new 
Configuration())
                         .setJobClassName(TestJob.class.getCanonicalName())
                         .build();
 
@@ -161,7 +165,7 @@ public class ClassPathPackagedProgramRetrieverTest extends 
TestLogger {
             throws IOException, FlinkException, ProgramInvocationException {
         final File testJar = TestJob.getTestJobJar();
         final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new 
Configuration())
                         .setJarsOnClassPath(() -> 
Collections.singleton(testJar))
                         .build();
 
@@ -176,7 +180,7 @@ public class ClassPathPackagedProgramRetrieverTest extends 
TestLogger {
         final File testJar = new File("non-existing");
 
         final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new 
Configuration())
                         // Both a class name is specified and a JAR "is" on 
the class path
                         // The class name should have precedence.
                         .setJobClassName(TestJob.class.getCanonicalName())
@@ -200,7 +204,7 @@ public class ClassPathPackagedProgramRetrieverTest extends 
TestLogger {
         SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, 
configuration);
 
         final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new 
Configuration())
                         .setJobClassName(TestJob.class.getCanonicalName())
                         .build();
 
@@ -249,7 +253,7 @@ public class ClassPathPackagedProgramRetrieverTest extends 
TestLogger {
             throws IOException, ProgramInvocationException {
         final File testJar = TestJob.getTestJobJar();
         final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new 
Configuration())
                         .setJarsOnClassPath(() -> 
Collections.singleton(testJar))
                         .setUserLibDirectory(userDirHasNotEntryClass)
                         .build();
@@ -268,7 +272,7 @@ public class ClassPathPackagedProgramRetrieverTest extends 
TestLogger {
     public void 
testJobGraphRetrievalFailIfDoesNotFindTheEntryClassInTheJobDir()
             throws IOException, ProgramInvocationException {
         final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new 
Configuration())
                         .setJobClassName(TestJobInfo.JOB_CLASS)
                         .setJarsOnClassPath(Collections::emptyList)
                         .setUserLibDirectory(userDirHasNotEntryClass)
@@ -288,7 +292,7 @@ public class ClassPathPackagedProgramRetrieverTest extends 
TestLogger {
     public void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass()
             throws IOException, FlinkException, ProgramInvocationException {
         final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new 
Configuration())
                         .setJarsOnClassPath(Collections::emptyList)
                         .setUserLibDirectory(userDirHasEntryClass)
                         .build();
@@ -303,7 +307,7 @@ public class ClassPathPackagedProgramRetrieverTest extends 
TestLogger {
     public void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass()
             throws IOException, FlinkException, ProgramInvocationException {
         final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new 
Configuration())
                         .setJobClassName(TestJobInfo.JOB_CLASS)
                         .setJarsOnClassPath(Collections::emptyList)
                         .setUserLibDirectory(userDirHasEntryClass)
@@ -320,7 +324,7 @@ public class ClassPathPackagedProgramRetrieverTest extends 
TestLogger {
             throws IOException, FlinkException, ProgramInvocationException {
         final File testJar = TestJob.getTestJobJar();
         final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new 
Configuration())
                         .setJarFile(testJar)
                         .build();
         final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new 
Configuration());
@@ -336,7 +340,7 @@ public class ClassPathPackagedProgramRetrieverTest extends 
TestLogger {
             throws IOException, FlinkException, ProgramInvocationException {
         final File testJar = TestJob.getTestJobJar();
         final ClassPathPackagedProgramRetriever retrieverUnderTest =
-                ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new 
Configuration())
                         .setJarFile(testJar)
                         .setUserLibDirectory(userDirHasEntryClass)
                         .build();
@@ -350,6 +354,50 @@ public class ClassPathPackagedProgramRetrieverTest extends 
TestLogger {
                 
containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray()));
     }
 
+    @Test
+    public void testChildFirstDefaultConfiguration() throws FlinkException, 
IOException {
+        // this is a sanity check to backup testConfigurationIsConsidered
+        final Configuration configuration = new Configuration();
+        // CHECK_LEAKED_CLASSLOADER has to be disabled to enable the 
instanceof check later on in
+        // this test. Otherwise, the actual instance would be hidden by a 
wrapper
+        configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+        final ClassPathPackagedProgramRetriever retriever =
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, configuration)
+                        .setUserLibDirectory(userDirHasEntryClass)
+                        .setJobClassName(TestJobInfo.JOB_CLASS)
+                        .build();
+
+        assertThat(
+                retriever.getPackagedProgram().getUserCodeClassLoader(),
+                IsInstanceOf.instanceOf(ChildFirstClassLoader.class));
+    }
+
+    @Test
+    public void testConfigurationIsConsidered() throws FlinkException, 
IOException {
+        final String parentFirstConfigValue = "parent-first";
+        // we want to make sure that parent-first is not set as a default
+        assertThat(
+                CoreOptions.CLASSLOADER_RESOLVE_ORDER.defaultValue(),
+                not(is(parentFirstConfigValue)));
+
+        final Configuration configuration = new Configuration();
+        configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, 
parentFirstConfigValue);
+        // CHECK_LEAKED_CLASSLOADER has to be disabled to enable the 
instanceof check later on in
+        // this test. Otherwise, the actual instance would be hidden by a 
wrapper
+        configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+        final ClassPathPackagedProgramRetriever retriever =
+                
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, configuration)
+                        .setUserLibDirectory(userDirHasEntryClass)
+                        .setJobClassName(TestJobInfo.JOB_CLASS)
+                        .build();
+
+        assertThat(
+                retriever.getPackagedProgram().getUserCodeClassLoader(),
+                
IsInstanceOf.instanceOf(FlinkUserCodeClassLoaders.ParentFirstClassLoader.class));
+    }
+
     private JobGraph retrieveJobGraph(
             ClassPathPackagedProgramRetriever retrieverUnderTest, 
Configuration configuration)
             throws FlinkException, ProgramInvocationException, 
MalformedURLException {
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
index 48697ea..4cbe5e4 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
@@ -63,15 +63,15 @@ public final class StandaloneApplicationClusterEntryPoint 
extends ApplicationClu
                         new 
StandaloneApplicationClusterConfigurationParserFactory(),
                         StandaloneApplicationClusterEntryPoint.class);
 
+        Configuration configuration = 
loadConfigurationFromClusterConfig(clusterConfiguration);
         PackagedProgram program = null;
         try {
-            program = getPackagedProgram(clusterConfiguration);
+            program = getPackagedProgram(clusterConfiguration, configuration);
         } catch (Exception e) {
             LOG.error("Could not create application program.", e);
             System.exit(1);
         }
 
-        Configuration configuration = 
loadConfigurationFromClusterConfig(clusterConfiguration);
         try {
             configureExecution(configuration, program);
         } catch (Exception e) {
@@ -101,20 +101,25 @@ public final class StandaloneApplicationClusterEntryPoint 
extends ApplicationClu
     }
 
     private static PackagedProgram getPackagedProgram(
-            final StandaloneApplicationClusterConfiguration 
clusterConfiguration)
+            final StandaloneApplicationClusterConfiguration 
clusterConfiguration,
+            Configuration configuration)
             throws IOException, FlinkException {
         final PackagedProgramRetriever programRetriever =
                 getPackagedProgramRetriever(
-                        clusterConfiguration.getArgs(), 
clusterConfiguration.getJobClassName());
+                        clusterConfiguration.getArgs(),
+                        clusterConfiguration.getJobClassName(),
+                        configuration);
         return programRetriever.getPackagedProgram();
     }
 
     private static PackagedProgramRetriever getPackagedProgramRetriever(
-            final String[] programArguments, @Nullable final String 
jobClassName)
+            final String[] programArguments,
+            @Nullable final String jobClassName,
+            Configuration configuration)
             throws IOException {
         final File userLibDir = 
ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null);
         final ClassPathPackagedProgramRetriever.Builder retrieverBuilder =
-                ClassPathPackagedProgramRetriever.newBuilder(programArguments)
+                ClassPathPackagedProgramRetriever.newBuilder(programArguments, 
configuration)
                         .setUserLibDirectory(userLibDir)
                         .setJobClassName(jobClassName);
         return retrieverBuilder.build();
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
index 9254e50..5c5f0a9 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
@@ -109,7 +109,7 @@ public final class KubernetesApplicationClusterEntrypoint 
extends ApplicationClu
 
         final File userLibDir = 
ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null);
         final ClassPathPackagedProgramRetriever.Builder retrieverBuilder =
-                ClassPathPackagedProgramRetriever.newBuilder(programArguments)
+                ClassPathPackagedProgramRetriever.newBuilder(programArguments, 
configuration)
                         .setUserLibDirectory(userLibDir)
                         .setJobClassName(jobClassName);
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java
index 21ff7a2..e31a131 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java
@@ -134,7 +134,7 @@ public final class YarnApplicationClusterEntryPoint extends 
ApplicationClusterEn
         final File userLibDir = 
YarnEntrypointUtils.getUsrLibDir(configuration).orElse(null);
         final File userApplicationJar = getUserApplicationJar(userLibDir, 
configuration);
         final ClassPathPackagedProgramRetriever.Builder retrieverBuilder =
-                ClassPathPackagedProgramRetriever.newBuilder(programArguments)
+                ClassPathPackagedProgramRetriever.newBuilder(programArguments, 
configuration)
                         .setUserLibDirectory(userLibDir)
                         .setJarFile(userApplicationJar)
                         .setJobClassName(jobClassName);

Reply via email to