This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 848d4ba [FLINK-21445][clients] Adds configuration to
ClassPathPackagedProgramRetriever
848d4ba is described below
commit 848d4baf7054781de82d731d6ed60bb10d44d8e0
Author: Matthias Pohl <[email protected]>
AuthorDate: Tue Jul 6 09:29:47 2021 +0200
[FLINK-21445][clients] Adds configuration to
ClassPathPackagedProgramRetriever
---
.../ClassPathPackagedProgramRetriever.java | 25 ++++++--
.../ClassPathPackagedProgramRetrieverTest.java | 68 ++++++++++++++++++----
.../StandaloneApplicationClusterEntryPoint.java | 17 ++++--
.../KubernetesApplicationClusterEntrypoint.java | 2 +-
.../YarnApplicationClusterEntryPoint.java | 2 +-
5 files changed, 91 insertions(+), 23 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
index 65471a0..041d2cf 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramRetriever;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
@@ -76,12 +77,15 @@ public class ClassPathPackagedProgramRetriever implements
PackagedProgramRetriev
@Nullable private final File jarFile;
+ @Nonnull private final Configuration configuration;
+
private ClassPathPackagedProgramRetriever(
@Nonnull String[] programArguments,
@Nullable String jobClassName,
@Nonnull Supplier<Iterable<File>> jarsOnClassPath,
@Nullable File userLibDirectory,
- @Nullable File jarFile)
+ @Nullable File jarFile,
+ @Nonnull Configuration configuration)
throws IOException {
this.userLibDirectory = userLibDirectory;
this.programArguments = requireNonNull(programArguments,
"programArguments");
@@ -89,6 +93,7 @@ public class ClassPathPackagedProgramRetriever implements
PackagedProgramRetriev
this.jarsOnClassPath = requireNonNull(jarsOnClassPath);
this.userClassPaths = discoverUserClassPaths(userLibDirectory);
this.jarFile = jarFile;
+ this.configuration = configuration;
}
private Collection<URL> discoverUserClassPaths(@Nullable File jobDir)
throws IOException {
@@ -129,6 +134,7 @@ public class ClassPathPackagedProgramRetriever implements
PackagedProgramRetriev
.setArguments(programArguments)
.setJarFile(jarFile)
.setEntryPointClassName(jobClassName)
+ .setConfiguration(configuration)
.build();
}
@@ -137,6 +143,7 @@ public class ClassPathPackagedProgramRetriever implements
PackagedProgramRetriev
.setUserClassPaths(new ArrayList<>(userClassPaths))
.setEntryPointClassName(entryClass)
.setArguments(programArguments)
+ .setConfiguration(configuration)
.build();
} catch (ProgramInvocationException e) {
throw new FlinkException("Could not load the provided entrypoint
class.", e);
@@ -252,8 +259,11 @@ public class ClassPathPackagedProgramRetriever implements
PackagedProgramRetriev
private File jarFile;
- private Builder(String[] programArguments) {
+ private final Configuration configuration;
+
+ private Builder(String[] programArguments, Configuration
configuration) {
this.programArguments = requireNonNull(programArguments);
+ this.configuration = requireNonNull(configuration);
}
public Builder setJobClassName(@Nullable String jobClassName) {
@@ -278,11 +288,16 @@ public class ClassPathPackagedProgramRetriever implements
PackagedProgramRetriev
public ClassPathPackagedProgramRetriever build() throws IOException {
return new ClassPathPackagedProgramRetriever(
- programArguments, jobClassName, jarsOnClassPath,
userLibDirectory, jarFile);
+ programArguments,
+ jobClassName,
+ jarsOnClassPath,
+ userLibDirectory,
+ jarFile,
+ configuration);
}
}
- public static Builder newBuilder(String[] programArguments) {
- return new Builder(programArguments);
+ public static Builder newBuilder(String[] programArguments, Configuration
configuration) {
+ return new Builder(programArguments, configuration);
}
}
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
index 291504f..c9d5f88 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
@@ -31,14 +31,17 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
+import
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.util.ChildFirstClassLoader;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionUtils;
+import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -65,6 +68,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -142,7 +146,7 @@ public class ClassPathPackagedProgramRetrieverTest extends
TestLogger {
configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
jobId.toHexString());
final ClassPathPackagedProgramRetriever retrieverUnderTest =
- ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new
Configuration())
.setJobClassName(TestJob.class.getCanonicalName())
.build();
@@ -161,7 +165,7 @@ public class ClassPathPackagedProgramRetrieverTest extends
TestLogger {
throws IOException, FlinkException, ProgramInvocationException {
final File testJar = TestJob.getTestJobJar();
final ClassPathPackagedProgramRetriever retrieverUnderTest =
- ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new
Configuration())
.setJarsOnClassPath(() ->
Collections.singleton(testJar))
.build();
@@ -176,7 +180,7 @@ public class ClassPathPackagedProgramRetrieverTest extends
TestLogger {
final File testJar = new File("non-existing");
final ClassPathPackagedProgramRetriever retrieverUnderTest =
- ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new
Configuration())
// Both a class name is specified and a JAR "is" on
the class path
// The class name should have precedence.
.setJobClassName(TestJob.class.getCanonicalName())
@@ -200,7 +204,7 @@ public class ClassPathPackagedProgramRetrieverTest extends
TestLogger {
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings,
configuration);
final ClassPathPackagedProgramRetriever retrieverUnderTest =
- ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new
Configuration())
.setJobClassName(TestJob.class.getCanonicalName())
.build();
@@ -249,7 +253,7 @@ public class ClassPathPackagedProgramRetrieverTest extends
TestLogger {
throws IOException, ProgramInvocationException {
final File testJar = TestJob.getTestJobJar();
final ClassPathPackagedProgramRetriever retrieverUnderTest =
- ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new
Configuration())
.setJarsOnClassPath(() ->
Collections.singleton(testJar))
.setUserLibDirectory(userDirHasNotEntryClass)
.build();
@@ -268,7 +272,7 @@ public class ClassPathPackagedProgramRetrieverTest extends
TestLogger {
public void
testJobGraphRetrievalFailIfDoesNotFindTheEntryClassInTheJobDir()
throws IOException, ProgramInvocationException {
final ClassPathPackagedProgramRetriever retrieverUnderTest =
- ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new
Configuration())
.setJobClassName(TestJobInfo.JOB_CLASS)
.setJarsOnClassPath(Collections::emptyList)
.setUserLibDirectory(userDirHasNotEntryClass)
@@ -288,7 +292,7 @@ public class ClassPathPackagedProgramRetrieverTest extends
TestLogger {
public void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass()
throws IOException, FlinkException, ProgramInvocationException {
final ClassPathPackagedProgramRetriever retrieverUnderTest =
- ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new
Configuration())
.setJarsOnClassPath(Collections::emptyList)
.setUserLibDirectory(userDirHasEntryClass)
.build();
@@ -303,7 +307,7 @@ public class ClassPathPackagedProgramRetrieverTest extends
TestLogger {
public void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass()
throws IOException, FlinkException, ProgramInvocationException {
final ClassPathPackagedProgramRetriever retrieverUnderTest =
- ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new
Configuration())
.setJobClassName(TestJobInfo.JOB_CLASS)
.setJarsOnClassPath(Collections::emptyList)
.setUserLibDirectory(userDirHasEntryClass)
@@ -320,7 +324,7 @@ public class ClassPathPackagedProgramRetrieverTest extends
TestLogger {
throws IOException, FlinkException, ProgramInvocationException {
final File testJar = TestJob.getTestJobJar();
final ClassPathPackagedProgramRetriever retrieverUnderTest =
- ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new
Configuration())
.setJarFile(testJar)
.build();
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
@@ -336,7 +340,7 @@ public class ClassPathPackagedProgramRetrieverTest extends
TestLogger {
throws IOException, FlinkException, ProgramInvocationException {
final File testJar = TestJob.getTestJobJar();
final ClassPathPackagedProgramRetriever retrieverUnderTest =
- ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new
Configuration())
.setJarFile(testJar)
.setUserLibDirectory(userDirHasEntryClass)
.build();
@@ -350,6 +354,50 @@ public class ClassPathPackagedProgramRetrieverTest extends
TestLogger {
containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray()));
}
+ @Test
+ public void testChildFirstDefaultConfiguration() throws FlinkException,
IOException {
+ // this is a sanity check to backup testConfigurationIsConsidered
+ final Configuration configuration = new Configuration();
+ // CHECK_LEAKED_CLASSLOADER has to be disabled to enable the
instanceof check later on in
+ // this test. Otherwise, the actual instance would be hidden by a
wrapper
+ configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+ final ClassPathPackagedProgramRetriever retriever =
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, configuration)
+ .setUserLibDirectory(userDirHasEntryClass)
+ .setJobClassName(TestJobInfo.JOB_CLASS)
+ .build();
+
+ assertThat(
+ retriever.getPackagedProgram().getUserCodeClassLoader(),
+ IsInstanceOf.instanceOf(ChildFirstClassLoader.class));
+ }
+
+ @Test
+ public void testConfigurationIsConsidered() throws FlinkException,
IOException {
+ final String parentFirstConfigValue = "parent-first";
+ // we want to make sure that parent-first is not set as a default
+ assertThat(
+ CoreOptions.CLASSLOADER_RESOLVE_ORDER.defaultValue(),
+ not(is(parentFirstConfigValue)));
+
+ final Configuration configuration = new Configuration();
+ configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
parentFirstConfigValue);
+ // CHECK_LEAKED_CLASSLOADER has to be disabled to enable the
instanceof check later on in
+ // this test. Otherwise, the actual instance would be hidden by a
wrapper
+ configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+ final ClassPathPackagedProgramRetriever retriever =
+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, configuration)
+ .setUserLibDirectory(userDirHasEntryClass)
+ .setJobClassName(TestJobInfo.JOB_CLASS)
+ .build();
+
+ assertThat(
+ retriever.getPackagedProgram().getUserCodeClassLoader(),
+
IsInstanceOf.instanceOf(FlinkUserCodeClassLoaders.ParentFirstClassLoader.class));
+ }
+
private JobGraph retrieveJobGraph(
ClassPathPackagedProgramRetriever retrieverUnderTest,
Configuration configuration)
throws FlinkException, ProgramInvocationException,
MalformedURLException {
diff --git
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
index 48697ea..4cbe5e4 100644
---
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
+++
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
@@ -63,15 +63,15 @@ public final class StandaloneApplicationClusterEntryPoint
extends ApplicationClu
new
StandaloneApplicationClusterConfigurationParserFactory(),
StandaloneApplicationClusterEntryPoint.class);
+ Configuration configuration =
loadConfigurationFromClusterConfig(clusterConfiguration);
PackagedProgram program = null;
try {
- program = getPackagedProgram(clusterConfiguration);
+ program = getPackagedProgram(clusterConfiguration, configuration);
} catch (Exception e) {
LOG.error("Could not create application program.", e);
System.exit(1);
}
- Configuration configuration =
loadConfigurationFromClusterConfig(clusterConfiguration);
try {
configureExecution(configuration, program);
} catch (Exception e) {
@@ -101,20 +101,25 @@ public final class StandaloneApplicationClusterEntryPoint
extends ApplicationClu
}
private static PackagedProgram getPackagedProgram(
- final StandaloneApplicationClusterConfiguration
clusterConfiguration)
+ final StandaloneApplicationClusterConfiguration
clusterConfiguration,
+ Configuration configuration)
throws IOException, FlinkException {
final PackagedProgramRetriever programRetriever =
getPackagedProgramRetriever(
- clusterConfiguration.getArgs(),
clusterConfiguration.getJobClassName());
+ clusterConfiguration.getArgs(),
+ clusterConfiguration.getJobClassName(),
+ configuration);
return programRetriever.getPackagedProgram();
}
private static PackagedProgramRetriever getPackagedProgramRetriever(
- final String[] programArguments, @Nullable final String
jobClassName)
+ final String[] programArguments,
+ @Nullable final String jobClassName,
+ Configuration configuration)
throws IOException {
final File userLibDir =
ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null);
final ClassPathPackagedProgramRetriever.Builder retrieverBuilder =
- ClassPathPackagedProgramRetriever.newBuilder(programArguments)
+ ClassPathPackagedProgramRetriever.newBuilder(programArguments,
configuration)
.setUserLibDirectory(userLibDir)
.setJobClassName(jobClassName);
return retrieverBuilder.build();
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
index 9254e50..5c5f0a9 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
@@ -109,7 +109,7 @@ public final class KubernetesApplicationClusterEntrypoint
extends ApplicationClu
final File userLibDir =
ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null);
final ClassPathPackagedProgramRetriever.Builder retrieverBuilder =
- ClassPathPackagedProgramRetriever.newBuilder(programArguments)
+ ClassPathPackagedProgramRetriever.newBuilder(programArguments,
configuration)
.setUserLibDirectory(userLibDir)
.setJobClassName(jobClassName);
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java
index 21ff7a2..e31a131 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java
@@ -134,7 +134,7 @@ public final class YarnApplicationClusterEntryPoint extends
ApplicationClusterEn
final File userLibDir =
YarnEntrypointUtils.getUsrLibDir(configuration).orElse(null);
final File userApplicationJar = getUserApplicationJar(userLibDir,
configuration);
final ClassPathPackagedProgramRetriever.Builder retrieverBuilder =
- ClassPathPackagedProgramRetriever.newBuilder(programArguments)
+ ClassPathPackagedProgramRetriever.newBuilder(programArguments,
configuration)
.setUserLibDirectory(userLibDir)
.setJarFile(userApplicationJar)
.setJobClassName(jobClassName);