ferenc-csaky commented on code in PR #24065:
URL: https://github.com/apache/flink/pull/24065#discussion_r1457107530
##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java:
##########
@@ -111,14 +126,45 @@ private static PackagedProgramRetriever
getPackagedProgramRetriever(
// No need to do pipelineJars validation if it is a PyFlink job.
if (!(PackagedProgramUtils.isPython(jobClassName)
|| PackagedProgramUtils.isPython(programArguments))) {
- final List<File> pipelineJars =
-
KubernetesUtils.checkJarFileForApplicationMode(configuration);
- Preconditions.checkArgument(pipelineJars.size() == 1, "Should only
have one jar");
+ final ArtifactFetchManager.Result fetchRes =
fetchArtifacts(configuration);
+
return DefaultPackagedProgramRetriever.create(
- userLibDir, pipelineJars.get(0), jobClassName,
programArguments, configuration);
+ userLibDir,
+ fetchRes.getUserArtifactDir(),
+ fetchRes.getJobJar(),
+ jobClassName,
+ programArguments,
+ configuration);
}
return DefaultPackagedProgramRetriever.create(
userLibDir, jobClassName, programArguments, configuration);
}
+
+ private static ArtifactFetchManager.Result fetchArtifacts(Configuration
configuration) {
+ try {
+ String targetDir = generateJarDir(configuration);
+ ArtifactFetchManager fetchMgr = new
ArtifactFetchManager(configuration, targetDir);
+
+ List<String> uris = configuration.get(PipelineOptions.JARS);
+ checkArgument(uris.size() == 1, "Should only have one jar");
+ List<String> additionalUris =
+ configuration
+ .getOptional(ArtifactFetchOptions.ARTIFACT_LIST)
+ .orElse(Collections.emptyList());
+
+ return fetchMgr.fetchArtifacts(uris.get(0), additionalUris);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static String generateJarDir(Configuration configuration) {
+ return String.join(
+ File.separator,
+ new
File(configuration.get(ArtifactFetchOptions.ARTIFACT_BASE_DIR))
+ .getAbsolutePath(),
+ configuration.get(KubernetesConfigOptions.NAMESPACE),
+ configuration.get(KubernetesConfigOptions.CLUSTER_ID));
+ }
Review Comment:
I did not write this part and personally have limited K8s experience, but I
think the reason behind this is that multiple deployments may end up on the
same node, so adding the `namespace` and `cluster-id` to the dir path makes
sure any deployment will have its own artifact base dir.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]