wangyang0918 commented on a change in pull request #112:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835902072
##########
File path: helm/flink-operator/values.yaml
##########
@@ -41,6 +41,28 @@ jobServiceAccount:
"helm.sh/resource-policy": keep
name: "flink"
+operatorVolumeMounts:
+ create: false
+ data:
+ - name: flink-userlib
Review comment:
I would also suggest the `flink-artifacts`.
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/JarResolver.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.artifact;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/** Resolve the jar uri. */
+public class JarResolver {
+
+ public Path resolve(String jarURI) throws Exception {
Review comment:
I am thinking whether we should use `local://` here. Because `local://`
in the FlinkSessionJob has different meaning with FlinkDeployment CR. Maybe the
`file://` scheme is more appropriate. Or add least we need to document this
clearly.
This could be done as a follow-up ticket.
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment
deployment, Configuration conf)
LOG.info("Session cluster successfully deployed");
}
+ public void submitJobToSessionCluster(FlinkSessionJob sessionJob,
Configuration conf)
+ throws Exception {
+ JarRunResponseBody jarRunResponseBody =
+ jarUpload(sessionJob, conf)
+ .handle(
+ (response, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Failed to upload user
jar.", throwable);
+ throw new
FlinkRuntimeException(throwable);
+ } else {
+ return jarRun(sessionJob, response,
conf);
+ }
+ })
+ .get();
+
+ String jobID = jarRunResponseBody.getJobId().toHexString();
+ LOG.info("Submitted job: {} to session cluster.", jobID);
+
sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+ }
+
+ private JarRunResponseBody jarRun(
+ FlinkSessionJob sessionJob, JarUploadResponseBody response,
Configuration conf) {
+ final String jarId =
+
response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+ // we generate jobID in advance to help deduplicate job submission.
+ JobID jobID = new JobID();
+ try (final RestClusterClient<String> clusterClient =
+ (RestClusterClient<String>) getClusterClient(conf)) {
+ final JarRunHeaders headers = JarRunHeaders.getInstance();
+ final JarRunMessageParameters parameters =
headers.getUnresolvedMessageParameters();
+ parameters.jarIdPathParameter.resolve(jarId);
+ final JobSpec job = sessionJob.getSpec().getJob();
+ final JarRunRequestBody runRequestBody =
+ new JarRunRequestBody(
+ job.getEntryClass(),
+ null,
+ job.getArgs() == null ? null :
Arrays.asList(job.getArgs()),
+ job.getParallelism() > 0 ? job.getParallelism() :
null,
+ jobID,
+ null,
+
sessionJob.getSpec().getJob().getInitialSavepointPath());
+ LOG.info("Submitting job: {} to session cluster.",
jobID.toHexString());
+ return clusterClient
+ .sendRequest(headers, parameters, runRequestBody)
+ .get(1, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ LOG.error("Failed to submit job to session cluster.", e);
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ private CompletableFuture<JarUploadResponseBody> jarUpload(
+ FlinkSessionJob sessionJob, final Configuration conf) throws
Exception {
+ Path path =
jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+ JarUploadHeaders headers = JarUploadHeaders.getInstance();
+ String clusterId = sessionJob.getSpec().getClusterId();
+ String namespace = sessionJob.getMetadata().getNamespace();
Review comment:
Always using a `final` for local variable is not mandatory. But I think
the `final` is not completely useless. It will help a lot if you do not expect
the variable is overridden accidentally, especially in a long long method.
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}.
*/
+@ControllerConfiguration
+public class FlinkSessionJobController
+ implements
io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+ ErrorStatusHandler<FlinkSessionJob> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkSessionJobController.class);
+ private final KubernetesClient kubernetesClient;
+
+ private final InternalValidator<FlinkSessionJob> validator;
+ private final Reconciler<FlinkSessionJob> reconciler;
+ private final Observer<FlinkSessionJob> observer;
+ private final DefaultConfig defaultConfig;
+ private final FlinkOperatorConfiguration operatorConfiguration;
+
+ private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+ public FlinkSessionJobController(
+ DefaultConfig defaultConfig,
+ FlinkOperatorConfiguration operatorConfiguration,
+ KubernetesClient kubernetesClient,
+ InternalValidator<FlinkSessionJob> validator,
+ Reconciler<FlinkSessionJob> reconciler,
+ Observer<FlinkSessionJob> observer) {
+ this.defaultConfig = defaultConfig;
+ this.operatorConfiguration = operatorConfiguration;
+ this.kubernetesClient = kubernetesClient;
+ this.validator = validator;
+ this.reconciler = reconciler;
+ this.observer = observer;
+ }
+
+ @Override
+ public UpdateControl<FlinkSessionJob> reconcile(
+ FlinkSessionJob flinkSessionJob, Context context) {
+ FlinkSessionJob originalCopy =
ReconciliationUtils.clone(flinkSessionJob);
+ LOG.info("Starting reconciliation");
+ Optional<String> validationError = validator.validate(flinkSessionJob);
+ if (validationError.isPresent()) {
+ LOG.error("Validation failed: " + validationError.get());
+ ReconciliationUtils.updateForReconciliationError(
+ flinkSessionJob, validationError.get());
+ return ReconciliationUtils.toUpdateControl(originalCopy,
flinkSessionJob);
+ }
+
+ try {
+ // TODO refactor the reconciler interface to return UpdateControl
directly
Review comment:
Let's have more discussion in a dedicated ticket about changing the
`Reconciler` and `Observer` interfaces.
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment
deployment, Configuration conf)
LOG.info("Session cluster successfully deployed");
}
+ public void submitJobToSessionCluster(FlinkSessionJob sessionJob,
Configuration conf)
+ throws Exception {
+ JarRunResponseBody jarRunResponseBody =
+ jarUpload(sessionJob, conf)
+ .handle(
+ (response, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Failed to upload user
jar.", throwable);
+ throw new
FlinkRuntimeException(throwable);
+ } else {
+ return jarRun(sessionJob, response,
conf);
+ }
+ })
+ .get();
+
+ String jobID = jarRunResponseBody.getJobId().toHexString();
+ LOG.info("Submitted job: {} to session cluster.", jobID);
+
sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+ }
+
+ private JarRunResponseBody jarRun(
+ FlinkSessionJob sessionJob, JarUploadResponseBody response,
Configuration conf) {
+ String jarId =
+
response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+ // we generate jobID in advance to help deduplicate job submission.
+ JobID jobID = new JobID();
+ try (RestClusterClient<String> clusterClient =
+ (RestClusterClient<String>) getClusterClient(conf)) {
+ JarRunHeaders headers = JarRunHeaders.getInstance();
+ JarRunMessageParameters parameters =
headers.getUnresolvedMessageParameters();
+ parameters.jarIdPathParameter.resolve(jarId);
+ JobSpec job = sessionJob.getSpec().getJob();
+ JarRunRequestBody runRequestBody =
+ new JarRunRequestBody(
+ job.getEntryClass(),
+ null,
+ job.getArgs() == null ? null :
Arrays.asList(job.getArgs()),
+ job.getParallelism() > 0 ? job.getParallelism() :
null,
+ jobID,
+ null,
+
sessionJob.getSpec().getJob().getInitialSavepointPath());
+ LOG.info("Submitting job: {} to session cluster.",
jobID.toHexString());
+ return clusterClient
+ .sendRequest(headers, parameters, runRequestBody)
+ .get(1, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ LOG.error("Failed to submit job to session cluster.", e);
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ private CompletableFuture<JarUploadResponseBody> jarUpload(
+ FlinkSessionJob sessionJob, Configuration conf) throws Exception {
+ Path path =
jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+ JarUploadHeaders headers = JarUploadHeaders.getInstance();
+ String clusterId = sessionJob.getSpec().getClusterId();
+ String namespace = sessionJob.getMetadata().getNamespace();
+ int port = conf.getInteger(RestOptions.PORT);
+ String host =
+ ObjectUtils.firstNonNull(
+ operatorConfiguration.getFlinkServiceHostOverride(),
+
ExternalServiceDecorator.getNamespacedExternalServiceName(
+ clusterId, namespace));
+
+ try (RestClient restClient = new RestClient(conf,
Executors.newSingleThreadExecutor())) {
Review comment:
The executor is not shut down correctly.
##########
File path: examples/basic-session-job.yaml
##########
@@ -24,6 +24,6 @@ metadata:
spec:
clusterId: basic-session-example
job:
- jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ jarURI: local:///opt/flink/userlib/TopSpeedWindowing.jar
Review comment:
nit: I would like to use `/opt/flink/artifacts` here to avoid confused
with `/opt/flink/usrlib`. `/opt/flink/usrlib` is a special directory which all
the jars will be loaded by user classloader automatically.
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -189,6 +286,12 @@ public boolean isJobManagerPortReady(Configuration config)
{
return savepointOpt;
}
+ public void cancelSessionJob(JobID jobID, Configuration conf) throws
Exception {
+ try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+ clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
Review comment:
We could use `operatorConfiguration.getFlinkClientTimeout()`.
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ApplicationReconciler.class);
+
+ private final FlinkOperatorConfiguration operatorConfiguration;
+ private final KubernetesClient kubernetesClient;
+ private final FlinkService flinkService;
+
+ public FlinkSessionJobReconciler(
+ KubernetesClient kubernetesClient,
+ FlinkService flinkService,
+ FlinkOperatorConfiguration operatorConfiguration) {
+ this.kubernetesClient = kubernetesClient;
+ this.flinkService = flinkService;
+ this.operatorConfiguration = operatorConfiguration;
+ }
+
+ @Override
+ public void reconcile(
+ FlinkSessionJob flinkSessionJob, Context context, Configuration
defaultConfig)
+ throws Exception {
+
+ FlinkSessionJobSpec lastReconciledSpec =
+
flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+ if (lastReconciledSpec == null) {
+ deployFlinkJob(flinkSessionJob, defaultConfig);
+ return;
+ }
+
+ boolean specChanged =
!flinkSessionJob.getSpec().equals(lastReconciledSpec);
+
+ if (specChanged) {
+ // TODO reconcile other spec change.
+ LOG.info("Other spec change have not supported");
+ }
+ }
+
+ @Override
+ public DeleteControl cleanup(FlinkSessionJob sessionJob, Configuration
defaultConfig) {
+ Optional<FlinkDeployment> flinkDepOptional =
getFlinkDeployment(sessionJob);
+
+ if (flinkDepOptional.isPresent()) {
+ Configuration effectiveConfig =
+ FlinkUtils.getEffectiveConfig(flinkDepOptional.get(),
defaultConfig);
+ String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+ if (jobID != null) {
+ try {
+ flinkService.cancelSessionJob(JobID.fromHexString(jobID),
effectiveConfig);
+ } catch (Exception e) {
+ LOG.error("Failed to cancel job.", e);
+ }
+ }
+ } else {
+ LOG.info("Session cluster deployment not available");
+ }
+ return DeleteControl.defaultDelete();
+ }
+
+ private void deployFlinkJob(FlinkSessionJob sessionJob, Configuration
defaultConfig)
+ throws Exception {
+ Optional<FlinkDeployment> flinkDepOptional =
getFlinkDeployment(sessionJob);
+ if (flinkDepOptional.isPresent()
+ &&
(flinkDepOptional.get().getStatus().getJobManagerDeploymentStatus()
+ == JobManagerDeploymentStatus.READY)) {
+ Configuration effectiveConfig =
+ FlinkUtils.getEffectiveConfig(flinkDepOptional.get(),
defaultConfig);
+ flinkService.submitJobToSessionCluster(sessionJob,
effectiveConfig);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(sessionJob);
+ } else {
+ LOG.info("Session cluster deployment is not in READY state");
+ }
+ }
+
+ private Optional<FlinkDeployment> getFlinkDeployment(FlinkSessionJob
sessionJob) {
Review comment:
Could we register a new event source in `FlinkSessionJobController` and
use `context.getSecondaryResource()` to get the FlinkDeployment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]