ferenc-csaky commented on code in PR #24065:
URL: https://github.com/apache/flink/pull/24065#discussion_r1457107530


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java:
##########
@@ -111,14 +126,45 @@ private static PackagedProgramRetriever 
getPackagedProgramRetriever(
         // No need to do pipelineJars validation if it is a PyFlink job.
         if (!(PackagedProgramUtils.isPython(jobClassName)
                 || PackagedProgramUtils.isPython(programArguments))) {
-            final List<File> pipelineJars =
-                    
KubernetesUtils.checkJarFileForApplicationMode(configuration);
-            Preconditions.checkArgument(pipelineJars.size() == 1, "Should only 
have one jar");
+            final ArtifactFetchManager.Result fetchRes = 
fetchArtifacts(configuration);
+
             return DefaultPackagedProgramRetriever.create(
-                    userLibDir, pipelineJars.get(0), jobClassName, 
programArguments, configuration);
+                    userLibDir,
+                    fetchRes.getUserArtifactDir(),
+                    fetchRes.getJobJar(),
+                    jobClassName,
+                    programArguments,
+                    configuration);
         }
 
         return DefaultPackagedProgramRetriever.create(
                 userLibDir, jobClassName, programArguments, configuration);
     }
+
+    private static ArtifactFetchManager.Result fetchArtifacts(Configuration 
configuration) {
+        try {
+            String targetDir = generateJarDir(configuration);
+            ArtifactFetchManager fetchMgr = new 
ArtifactFetchManager(configuration, targetDir);
+
+            List<String> uris = configuration.get(PipelineOptions.JARS);
+            checkArgument(uris.size() == 1, "Should only have one jar");
+            List<String> additionalUris =
+                    configuration
+                            .getOptional(ArtifactFetchOptions.ARTIFACT_LIST)
+                            .orElse(Collections.emptyList());
+
+            return fetchMgr.fetchArtifacts(uris.get(0), additionalUris);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static String generateJarDir(Configuration configuration) {
+        return String.join(
+                File.separator,
+                new 
File(configuration.get(ArtifactFetchOptions.ARTIFACT_BASE_DIR))
+                        .getAbsolutePath(),
+                configuration.get(KubernetesConfigOptions.NAMESPACE),
+                configuration.get(KubernetesConfigOptions.CLUSTER_ID));
+    }

Review Comment:
   I did not write this part and personally have limited K8s experience, but I 
think the reason behind this is that multiple deployments may end up on the 
same node, so adding the `namespace` and `cluster-id` to the dir path makes 
sure any deployment will have its own artifact base dir.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to