Aitozi commented on a change in pull request #112:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835762928
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment
deployment, Configuration conf)
LOG.info("Session cluster successfully deployed");
}
+ public void submitJobToSessionCluster(FlinkSessionJob sessionJob,
Configuration conf)
+ throws Exception {
+ JarRunResponseBody jarRunResponseBody =
+ jarUpload(sessionJob, conf)
+ .handle(
+ (response, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Failed to upload user
jar.", throwable);
+ throw new
FlinkRuntimeException(throwable);
+ } else {
+ return jarRun(sessionJob, response,
conf);
+ }
+ })
+ .get();
+
+ String jobID = jarRunResponseBody.getJobId().toHexString();
+ LOG.info("Submitted job: {} to session cluster.", jobID);
+
sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+ }
+
+ private JarRunResponseBody jarRun(
+ FlinkSessionJob sessionJob, JarUploadResponseBody response,
Configuration conf) {
+ final String jarId =
+
response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+ // we generate jobID in advance to help deduplicate job submission.
+ JobID jobID = new JobID();
+ try (final RestClusterClient<String> clusterClient =
+ (RestClusterClient<String>) getClusterClient(conf)) {
+ final JarRunHeaders headers = JarRunHeaders.getInstance();
+ final JarRunMessageParameters parameters =
headers.getUnresolvedMessageParameters();
+ parameters.jarIdPathParameter.resolve(jarId);
+ final JobSpec job = sessionJob.getSpec().getJob();
+ final JarRunRequestBody runRequestBody =
+ new JarRunRequestBody(
+ job.getEntryClass(),
+ null,
+ job.getArgs() == null ? null :
Arrays.asList(job.getArgs()),
+ job.getParallelism() > 0 ? job.getParallelism() :
null,
+ jobID,
+ null,
+
sessionJob.getSpec().getJob().getInitialSavepointPath());
+ LOG.info("Submitting job: {} to session cluster.",
jobID.toHexString());
+ return clusterClient
+ .sendRequest(headers, parameters, runRequestBody)
+ .get(1, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ LOG.error("Failed to submit job to session cluster.", e);
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ private CompletableFuture<JarUploadResponseBody> jarUpload(
+ FlinkSessionJob sessionJob, final Configuration conf) throws
Exception {
+ Path path =
jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+ JarUploadHeaders headers = JarUploadHeaders.getInstance();
+ String clusterId = sessionJob.getSpec().getClusterId();
+ String namespace = sessionJob.getMetadata().getNamespace();
Review comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]