Aitozi commented on a change in pull request #112:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r836406584



##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment 
deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, 
Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user 
jar.", throwable);
+                                        throw new 
FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, 
conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        
sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, 
Configuration conf) {
+        String jarId =
+                
response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            JarRunHeaders headers = JarRunHeaders.getInstance();
+            JarRunMessageParameters parameters = 
headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            JobSpec job = sessionJob.getSpec().getJob();
+            JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : 
Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : 
null,
+                            jobID,
+                            null,
+                            
sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", 
jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, Configuration conf) throws Exception {
+        Path path = 
jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();
+        int port = conf.getInteger(RestOptions.PORT);
+        String host =
+                ObjectUtils.firstNonNull(
+                        operatorConfiguration.getFlinkServiceHostOverride(),
+                        
ExternalServiceDecorator.getNamespacedExternalServiceName(
+                                clusterId, namespace));
+
+        try (RestClient restClient = new RestClient(conf, 
Executors.newSingleThreadExecutor())) {

Review comment:
       I make it a field of the FlinkService now, I think this method should be 
improve later. I directly use the `RestClient` here because there is no 
suitable method to upload user jar here. Maybe we could include this upstream




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to