This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new e4a6faa [FLINK-26787] Initial implementation of
FlinkSessionJobController and reconciler
e4a6faa is described below
commit e4a6faae6102779e895113cd1584c9f4fef8cad4
Author: Aitozi <[email protected]>
AuthorDate: Sun Apr 3 22:28:17 2022 +0800
[FLINK-26787] Initial implementation of FlinkSessionJobController and
reconciler
---
docs/content/docs/custom-resource/reference.md | 17 +-
docs/content/docs/development/guide.md | 8 +
examples/basic-session-job.yaml | 2 +-
flink-kubernetes-operator/pom.xml | 6 +
.../flink/kubernetes/operator/FlinkOperator.java | 82 ++++---
.../JarResolver.java} | 26 ++-
.../config/FlinkOperatorConfiguration.java | 7 +-
.../operator/config/OperatorConfigOptions.java | 6 +
.../operator/controller/FlinkControllerConfig.java | 9 +-
.../controller/FlinkDeploymentController.java | 24 +-
.../controller/FlinkSessionJobController.java | 245 +++++++++++++++++++++
.../operator/crd/status/FlinkDeploymentStatus.java | 1 -
.../FlinkSessionJobReconciliationStatus.java | 2 +-
.../status}/JobManagerDeploymentStatus.java | 2 +-
.../kubernetes/operator/observer/Observer.java | 10 +-
.../operator/observer/SavepointFetchResult.java | 6 +-
.../AbstractDeploymentObserver.java} | 21 +-
.../ApplicationObserver.java} | 20 +-
.../observer/{ => deployment}/ObserverFactory.java | 23 +-
.../observer/{ => deployment}/SessionObserver.java | 17 +-
.../SessionJobObserver.java} | 19 +-
.../kubernetes/operator/reconciler/Reconciler.java | 29 +--
.../operator/reconciler/ReconciliationUtils.java | 40 +++-
.../AbstractDeploymentReconciler.java} | 13 +-
.../ApplicationReconciler.java} | 13 +-
.../{ => deployment}/ReconcilerFactory.java | 22 +-
.../{ => deployment}/SessionReconciler.java | 7 +-
.../sessionjob/FlinkSessionJobReconciler.java | 126 +++++++++++
.../kubernetes/operator/service/FlinkService.java | 111 +++++++++-
.../kubernetes/operator/utils/OperatorUtils.java | 16 ++
...loymentValidator.java => DefaultValidator.java} | 47 +++-
...tValidator.java => FlinkResourceValidator.java} | 17 +-
.../flink/kubernetes/operator/TestUtils.java | 62 ++++++
.../kubernetes/operator/TestingFlinkService.java | 20 ++
.../operator/artifact/JarResolverTest.java} | 25 ++-
.../controller/FlinkDeploymentControllerTest.java | 15 +-
...erverTest.java => ApplicationObserverTest.java} | 22 +-
.../operator/observer/SessionObserverTest.java | 2 +
.../ApplicationReconcilerTest.java} | 28 ++-
.../{ => deployment}/SessionReconcilerTest.java | 6 +-
.../sessionjob/FlinkSessionJobReconcilerTest.java | 61 +++++
.../operator/utils/ReconciliationUtilsTest.java | 2 +-
...alidatorTest.java => DefaultValidatorTest.java} | 79 ++++++-
.../operator/admission/FlinkOperatorWebhook.java | 4 +-
.../operator/admission/FlinkValidator.java | 8 +-
.../operator/admission/AdmissionHandlerTest.java | 4 +-
.../conf/flink-operator-config/log4j2.properties | 2 +-
.../crds/flinksessionjobs.flink.apache.org-v1.yml | 2 +-
.../templates/flink-operator.yaml | 6 +
helm/flink-kubernetes-operator/templates/rbac.yaml | 2 +
helm/flink-kubernetes-operator/values.yaml | 17 ++
51 files changed, 1121 insertions(+), 240 deletions(-)
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index 9880eec..3a7cf29 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -168,7 +168,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| Parameter | Type | Docs |
| ----------| ---- | ---- |
| jobStatus | org.apache.flink.kubernetes.operator.crd.status.JobStatus | Last
observed status of the Flink job on Application deployments. |
-| jobManagerDeploymentStatus |
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus | Last
observed status of the JobManager deployment. |
+| jobManagerDeploymentStatus |
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus |
Last observed status of the JobManager deployment. |
| reconciliationStatus |
org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus | Status
of the last reconcile operation. |
### FlinkSessionJobReconciliationStatus
@@ -180,7 +180,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| ----------| ---- | ---- |
| success | boolean | True if last reconciliation step was successful. |
| error | java.lang.String | If success == false, error information about the
reconciliation failure. |
-| flinkSessionJobSpec |
org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec | Last
reconciled job spec. Used to decide whether further reconciliation steps are
necessary. |
+| lastReconciledSpec |
org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec | Last
reconciled job spec. Used to decide whether further reconciliation steps are
necessary. |
### FlinkSessionJobStatus
**Class**:
org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus
@@ -192,6 +192,19 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| jobStatus | org.apache.flink.kubernetes.operator.crd.status.JobStatus | Last
observed status of the job. |
| reconciliationStatus |
org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobReconciliationStatus
| Status of the last reconcile operation. |
+### JobManagerDeploymentStatus
+**Class**:
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus
+
+**Description**: Status of the Flink JobManager Kubernetes deployment.
+
+| Value | Docs |
+| ----- | ---- |
+| READY | JobManager is running and ready to receive REST API calls. |
+| DEPLOYED_NOT_READY | JobManager is running but not ready yet to receive REST
API calls. |
+| DEPLOYING | JobManager process is starting up. |
+| MISSING | JobManager deployment not found, probably not started or killed by
user. |
+| ERROR | Deployment in terminal error, requires spec change for
reconciliation to continue. |
+
### JobStatus
**Class**: org.apache.flink.kubernetes.operator.crd.status.JobStatus
diff --git a/docs/content/docs/development/guide.md
b/docs/content/docs/development/guide.md
index 643aa29..d819425 100644
--- a/docs/content/docs/development/guide.md
+++ b/docs/content/docs/development/guide.md
@@ -108,6 +108,14 @@ So if the CRD is changed, you have to delete the CRD
resource manually, and re-i
kubectl delete crd flinkdeployments.flink.apache.org
```
+### Mounts
+
+The operator supports to specify the volume mounts. The default mounts to
hostPath can be activated by the following command. You can change the default
mounts in the `helm/flink-operator/values.yaml`
+
+```bash
+helm install flink-operator helm/flink-operator --set
operatorVolumeMounts.create=true --set operatorVolumes.create=true
+```
+
## CI/CD
We use [GitHub
Actions](https://help.github.com/en/actions/getting-started-with-github-actions/about-github-actions)
to help you automate your software development workflows in the same place you
store code and collaborate on pull requests and issues.
diff --git a/examples/basic-session-job.yaml b/examples/basic-session-job.yaml
index e2b2401..cfd5e4f 100644
--- a/examples/basic-session-job.yaml
+++ b/examples/basic-session-job.yaml
@@ -24,6 +24,6 @@ metadata:
spec:
clusterId: basic-session-example
job:
- jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ jarURI: file:///opt/flink/artifacts/TopSpeedWindowing.jar
parallelism: 2
upgradeMode: stateless
diff --git a/flink-kubernetes-operator/pom.xml
b/flink-kubernetes-operator/pom.xml
index cde7633..1ca0e66 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -71,6 +71,12 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime-web_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes-shaded</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 02757a6..6036f9a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -22,13 +22,20 @@ import
org.apache.flink.kubernetes.operator.config.DefaultConfig;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
import
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
+import
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
-import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
-import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
+import
org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
+import
org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
-import
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -46,54 +53,76 @@ public class FlinkOperator {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkOperator.class);
private final Operator operator;
- private final FlinkDeploymentController controller;
- private final FlinkControllerConfig controllerConfig;
+
+ private final FlinkOperatorConfiguration operatorConfiguration;
+ private final KubernetesClient client;
+ private final FlinkService flinkService;
+ private final ConfigurationService configurationService;
+ private final DefaultConfig defaultConfig;
public FlinkOperator() {
this(FlinkUtils.loadDefaultConfig());
}
public FlinkOperator(DefaultConfig defaultConfig) {
-
LOG.info("Starting Flink Kubernetes Operator");
OperatorMetricUtils.initOperatorMetrics(defaultConfig.getOperatorConfig());
- KubernetesClient client = new DefaultKubernetesClient();
- String namespace = client.getNamespace();
- if (namespace == null) {
- namespace = "default";
- }
-
- FlinkOperatorConfiguration operatorConfiguration =
+ this.defaultConfig = defaultConfig;
+ this.client = new DefaultKubernetesClient();
+ this.operatorConfiguration =
FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
+ this.configurationService =
getConfigurationService(operatorConfiguration);
+ this.operator = new Operator(client, configurationService);
+ this.flinkService = new FlinkService(client, operatorConfiguration);
+ }
- ConfigurationService configurationService =
getConfigurationService(operatorConfiguration);
-
- operator = new Operator(client, configurationService);
-
- FlinkService flinkService = new FlinkService(client,
operatorConfiguration);
-
- FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
+ private void registerDeploymentController() {
+ FlinkResourceValidator validator = new DefaultValidator();
ReconcilerFactory reconcilerFactory =
new ReconcilerFactory(client, flinkService,
operatorConfiguration);
ObserverFactory observerFactory =
new ObserverFactory(
flinkService, operatorConfiguration,
defaultConfig.getFlinkConfig());
- controller =
+ FlinkDeploymentController controller =
new FlinkDeploymentController(
defaultConfig,
operatorConfiguration,
client,
- namespace,
validator,
reconcilerFactory,
observerFactory);
- controllerConfig =
- new FlinkControllerConfig(controller,
operatorConfiguration.getWatchedNamespaces());
+ FlinkControllerConfig<FlinkDeployment> controllerConfig =
+ new FlinkControllerConfig<>(
+ controller,
operatorConfiguration.getWatchedNamespaces());
controller.setControllerConfig(controllerConfig);
controllerConfig.setConfigurationService(configurationService);
+ operator.register(controller, controllerConfig);
+ }
+
+ private void registerSessionJobController() {
+ FlinkResourceValidator validator = new DefaultValidator();
+ Reconciler<FlinkSessionJob> reconciler =
+ new FlinkSessionJobReconciler(client, flinkService,
operatorConfiguration);
+ Observer<FlinkSessionJob> observer = new SessionJobObserver();
+ FlinkSessionJobController controller =
+ new FlinkSessionJobController(
+ defaultConfig,
+ operatorConfiguration,
+ client,
+ validator,
+ reconciler,
+ observer);
+
+ FlinkControllerConfig<FlinkSessionJob> controllerConfig =
+ new FlinkControllerConfig<>(
+ controller,
operatorConfiguration.getWatchedNamespaces());
+
+ controllerConfig.setConfigurationService(configurationService);
+ controller.init(controllerConfig);
+ operator.register(controller, controllerConfig);
}
private ConfigurationService getConfigurationService(
@@ -115,7 +144,8 @@ public class FlinkOperator {
}
public void run() {
- operator.register(controller, controllerConfig);
+ registerDeploymentController();
+ registerSessionJobController();
operator.installShutdownHook();
operator.start();
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/JarResolver.java
similarity index 61%
copy from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
copy to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/JarResolver.java
index 732224d..bf60bf6 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/JarResolver.java
@@ -15,20 +15,22 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.validation;
+package org.apache.flink.kubernetes.operator.artifact;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.util.FlinkRuntimeException;
-import java.util.Optional;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
-/** Validator for {@link FlinkDeployment} resources. */
-public interface FlinkDeploymentValidator {
+/** Resolve the jar uri. */
+public class JarResolver {
- /**
- * Validate and return optional error.
- *
- * @param deployment
- * @return Optional error string, should be present iff validation
resulted in an error
- */
- Optional<String> validate(FlinkDeployment deployment);
+ public Path resolve(String jarURI) throws Exception {
+ URI uri = new URI(jarURI);
+ if (!"file".equals(uri.getScheme())) {
+ throw new FlinkRuntimeException("Only support local jar now.");
+ }
+ return Paths.get(uri.getPath());
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 9b27745..4cea133 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -39,6 +39,7 @@ public class FlinkOperatorConfiguration {
Duration flinkClientTimeout;
String flinkServiceHostOverride;
Set<String> watchedNamespaces;
+ Duration cancelJobTimeout;
public static FlinkOperatorConfiguration fromConfiguration(Configuration
operatorConfig) {
Set<String> watchedNamespaces = OperatorUtils.getWatchedNamespaces();
@@ -67,6 +68,9 @@ public class FlinkOperatorConfiguration {
Duration flinkClientTimeout =
operatorConfig.get(OperatorConfigOptions.OPERATOR_OBSERVER_FLINK_CLIENT_TIMEOUT);
+ Duration cancelJobTimeout =
+
operatorConfig.get(OperatorConfigOptions.OPERATOR_CANCEL_JOB_TIMEOUT);
+
String flinkServiceHostOverride = null;
if (EnvUtils.get("KUBERNETES_SERVICE_HOST") == null) {
// not running in k8s, simplify local development
@@ -81,6 +85,7 @@ public class FlinkOperatorConfiguration {
savepointTriggerGracePeriod,
flinkClientTimeout,
flinkServiceHostOverride,
- watchedNamespaces);
+ watchedNamespaces,
+ cancelJobTimeout);
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
index 6d7642a..40058bd 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
@@ -69,4 +69,10 @@ public class OperatorConfigOptions {
.defaultValue(Duration.ofSeconds(10))
.withDescription(
"The timeout for the observer to wait the flink
rest client to return.");
+
+ public static final ConfigOption<Duration> OPERATOR_CANCEL_JOB_TIMEOUT =
+ ConfigOptions.key("operator.reconciler.flink.cancel.job.timeout")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(1))
+ .withDescription("The timeout for the operator to cancel
job.");
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
index 49b2b27..2a55456 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
@@ -17,19 +17,18 @@
package org.apache.flink.kubernetes.operator.controller;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.config.runtime.AnnotationConfiguration;
import java.util.Set;
/** Custom config for {@link FlinkDeploymentController}. */
-public class FlinkControllerConfig extends
AnnotationConfiguration<FlinkDeployment> {
+public class FlinkControllerConfig<CR extends HasMetadata> extends
AnnotationConfiguration<CR> {
private final Set<String> watchedNamespaces;
- public FlinkControllerConfig(
- FlinkDeploymentController reconciler, Set<String>
watchedNamespaces) {
+ public FlinkControllerConfig(Reconciler<CR> reconciler, Set<String>
watchedNamespaces) {
super(reconciler);
this.watchedNamespaces = watchedNamespaces;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 0067674..8ebeb50 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -21,15 +21,15 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.DefaultConfig;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
-import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
+import
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
-import
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import org.apache.flink.util.Preconditions;
import io.fabric8.kubernetes.api.model.Event;
@@ -62,28 +62,24 @@ public class FlinkDeploymentController
private final KubernetesClient kubernetesClient;
- private final String operatorNamespace;
-
- private final FlinkDeploymentValidator validator;
+ private final FlinkResourceValidator validator;
private final ReconcilerFactory reconcilerFactory;
private final ObserverFactory observerFactory;
private final DefaultConfig defaultConfig;
private final FlinkOperatorConfiguration operatorConfiguration;
- private FlinkControllerConfig controllerConfig;
+ private FlinkControllerConfig<FlinkDeployment> controllerConfig;
public FlinkDeploymentController(
DefaultConfig defaultConfig,
FlinkOperatorConfiguration operatorConfiguration,
KubernetesClient kubernetesClient,
- String operatorNamespace,
- FlinkDeploymentValidator validator,
+ FlinkResourceValidator validator,
ReconcilerFactory reconcilerFactory,
ObserverFactory observerFactory) {
this.defaultConfig = defaultConfig;
this.operatorConfiguration = operatorConfiguration;
this.kubernetesClient = kubernetesClient;
- this.operatorNamespace = operatorNamespace;
this.validator = validator;
this.reconcilerFactory = reconcilerFactory;
this.observerFactory = observerFactory;
@@ -99,7 +95,7 @@ public class FlinkDeploymentController
}
Configuration effectiveConfig =
FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig());
- return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp,
effectiveConfig);
+ return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp,
context, effectiveConfig);
}
@Override
@@ -108,7 +104,7 @@ public class FlinkDeploymentController
FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
try {
observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
- Optional<String> validationError = validator.validate(flinkApp);
+ Optional<String> validationError =
validator.validateDeployment(flinkApp);
if (validationError.isPresent()) {
LOG.error("Validation failed: " + validationError.get());
ReconciliationUtils.updateForReconciliationError(flinkApp,
validationError.get());
@@ -170,7 +166,7 @@ public class FlinkDeploymentController
return Optional.of(flinkApp);
}
- public void setControllerConfig(FlinkControllerConfig config) {
+ public void setControllerConfig(FlinkControllerConfig<FlinkDeployment>
config) {
this.controllerConfig = config;
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
new file mode 100644
index 0000000..1e563fc
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import
io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever;
+import
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}.
*/
+@ControllerConfiguration
+public class FlinkSessionJobController
+ implements
io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+ ErrorStatusHandler<FlinkSessionJob>,
+ EventSourceInitializer<FlinkSessionJob> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkSessionJobController.class);
+ private static final String CLUSTER_ID_INDEX = "clusterId_index";
+ private static final String ALL_NAMESPACE = "allNamespace";
+
+ private final KubernetesClient kubernetesClient;
+
+ private final FlinkResourceValidator validator;
+ private final Reconciler<FlinkSessionJob> reconciler;
+ private final Observer<FlinkSessionJob> observer;
+ private final DefaultConfig defaultConfig;
+ private final FlinkOperatorConfiguration operatorConfiguration;
+ private Map<String, SharedIndexInformer<FlinkSessionJob>> informers;
+ private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+ public FlinkSessionJobController(
+ DefaultConfig defaultConfig,
+ FlinkOperatorConfiguration operatorConfiguration,
+ KubernetesClient kubernetesClient,
+ FlinkResourceValidator validator,
+ Reconciler<FlinkSessionJob> reconciler,
+ Observer<FlinkSessionJob> observer) {
+ this.defaultConfig = defaultConfig;
+ this.operatorConfiguration = operatorConfiguration;
+ this.kubernetesClient = kubernetesClient;
+ this.validator = validator;
+ this.reconciler = reconciler;
+ this.observer = observer;
+ }
+
+ public void init(FlinkControllerConfig<FlinkSessionJob> config) {
+ this.controllerConfig = config;
+ this.informers = createInformers();
+ }
+
+ @Override
+ public UpdateControl<FlinkSessionJob> reconcile(
+ FlinkSessionJob flinkSessionJob, Context context) {
+ LOG.info("Starting reconciliation");
+ FlinkSessionJob originalCopy =
ReconciliationUtils.clone(flinkSessionJob);
+ observer.observe(flinkSessionJob, context);
+ Optional<String> validationError =
+ validator.validateSessionJob(
+ flinkSessionJob,
+ OperatorUtils.getSecondaryResource(
+ flinkSessionJob, context,
operatorConfiguration));
+ if (validationError.isPresent()) {
+ LOG.error("Validation failed: " + validationError.get());
+ ReconciliationUtils.updateForReconciliationError(
+ flinkSessionJob, validationError.get());
+ return ReconciliationUtils.toUpdateControl(originalCopy,
flinkSessionJob);
+ }
+
+ try {
+ // TODO refactor the reconciler interface to return UpdateControl
directly
+ reconciler.reconcile(flinkSessionJob, context,
defaultConfig.getFlinkConfig());
+ } catch (Exception e) {
+ throw new ReconciliationException(e);
+ }
+
+ return ReconciliationUtils.toUpdateControl(originalCopy,
flinkSessionJob)
+
.rescheduleAfter(operatorConfiguration.getReconcileInterval().toMillis());
+ }
+
+ @Override
+ public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
+ LOG.info("Deleting FlinkSessionJob");
+
+ return reconciler.cleanup(sessionJob, context,
defaultConfig.getFlinkConfig());
+ }
+
+ @Override
+ public Optional<FlinkSessionJob> updateErrorStatus(
+ FlinkSessionJob flinkSessionJob, RetryInfo retryInfo,
RuntimeException e) {
+ LOG.warn(
+ "Attempt count: {}, last attempt: {}",
+ retryInfo.getAttemptCount(),
+ retryInfo.isLastAttempt());
+
+ ReconciliationUtils.updateForReconciliationError(
+ flinkSessionJob,
+ (e instanceof ReconciliationException) ?
e.getCause().toString() : e.toString());
+ return Optional.of(flinkSessionJob);
+ }
+
+ @Override
+ public List<EventSource> prepareEventSources(
+ EventSourceContext<FlinkSessionJob> eventSourceContext) {
+ Preconditions.checkNotNull(controllerConfig, "Controller config cannot
be null");
+ Set<String> effectiveNamespaces =
controllerConfig.getEffectiveNamespaces();
+ if (effectiveNamespaces.isEmpty()) {
+ return List.of(createFlinkDepInformerEventSource(ALL_NAMESPACE));
+ } else {
+ return effectiveNamespaces.stream()
+ .map(this::createFlinkDepInformerEventSource)
+ .collect(Collectors.toList());
+ }
+ }
+
+ private InformerEventSource<FlinkDeployment, FlinkSessionJob>
createFlinkDepInformerEventSource(
+ String name) {
+ return new InformerEventSource<>(
+
kubernetesClient.resources(FlinkDeployment.class).runnableInformer(0),
+ primaryResourceRetriever(),
+ sessionJob ->
+ new ResourceID(
+ sessionJob.getSpec().getClusterId(),
+ sessionJob.getMetadata().getNamespace()),
+ false) {
+ @Override
+ public String name() {
+ return name;
+ }
+ };
+ }
+
+ /**
+ * Mapping the {@link FlinkDeployment} session cluster to {@link
FlinkSessionJob}. It leverages
+ * the informer indexer.
+ *
+ * @return The {@link PrimaryResourcesRetriever}.
+ */
+ private PrimaryResourcesRetriever<FlinkDeployment>
primaryResourceRetriever() {
+ return flinkDeployment -> {
+ var namespace = flinkDeployment.getMetadata().getNamespace();
+ var informer =
+ controllerConfig.getEffectiveNamespaces().isEmpty()
+ ? informers.get(ALL_NAMESPACE)
+ : informers.get(namespace);
+
+ var sessionJobs =
+ informer.getIndexer()
+ .byIndex(CLUSTER_ID_INDEX,
flinkDeployment.getMetadata().getName());
+ var resourceIDs = new HashSet<ResourceID>();
+ for (FlinkSessionJob sessionJob : sessionJobs) {
+ resourceIDs.add(
+ new ResourceID(
+ sessionJob.getMetadata().getName(),
+ sessionJob.getMetadata().getNamespace()));
+ }
+ LOG.debug(
+ "Find the target resource {} for {} ",
+ resourceIDs,
+ flinkDeployment.getMetadata().getNamespace());
+ return resourceIDs;
+ };
+ }
+
+ /**
+ * Create informers for session job to build indexer for cluster to
session job relations.
+ *
+ * @return The different namespace's index informer.
+ */
+ private Map<String, SharedIndexInformer<FlinkSessionJob>>
createInformers() {
+ Set<String> effectiveNamespaces =
controllerConfig.getEffectiveNamespaces();
+ if (effectiveNamespaces.isEmpty()) {
+ return Map.of(
+ ALL_NAMESPACE,
+ kubernetesClient
+ .resources(FlinkSessionJob.class)
+ .inAnyNamespace()
+ .withIndexers(clusterToSessionJobIndexer())
+ .inform());
+ } else {
+ var informers = new HashMap<String,
SharedIndexInformer<FlinkSessionJob>>();
+ for (String effectiveNamespace : effectiveNamespaces) {
+ informers.put(
+ effectiveNamespace,
+ kubernetesClient
+ .resources(FlinkSessionJob.class)
+ .inNamespace(effectiveNamespace)
+ .withIndexers(clusterToSessionJobIndexer())
+ .inform());
+ }
+ return informers;
+ }
+ }
+
+ private Map<String, Function<FlinkSessionJob, List<String>>>
clusterToSessionJobIndexer() {
+ return Map.of(CLUSTER_ID_INDEX, sessionJob ->
List.of(sessionJob.getSpec().getClusterId()));
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index a8b7794..4024249 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -18,7 +18,6 @@
package org.apache.flink.kubernetes.operator.crd.status;
import org.apache.flink.annotation.Experimental;
-import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import lombok.AllArgsConstructor;
import lombok.Data;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkSessionJobReconciliationStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkSessionJobReconciliationStatus.java
index b0b18c0..c310087 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkSessionJobReconciliationStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkSessionJobReconciliationStatus.java
@@ -41,5 +41,5 @@ public class FlinkSessionJobReconciliationStatus {
/**
* Last reconciled job spec. Used to decide whether further reconciliation
steps are necessary.
*/
- private FlinkSessionJobSpec flinkSessionJobSpec;
+ private FlinkSessionJobSpec lastReconciledSpec;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobManagerDeploymentStatus.java
similarity index 97%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobManagerDeploymentStatus.java
index 6d50154..249fb51 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobManagerDeploymentStatus.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.crd.status;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index 1f7e16e..29621ea 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -17,18 +17,16 @@
package org.apache.flink.kubernetes.operator.observer;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-
import io.javaoperatorsdk.operator.api.reconciler.Context;
-/** The Observer of {@link FlinkDeployment}. */
-public interface Observer {
+/** The Observer of custom resource. */
+public interface Observer<CR> {
/**
* Observe the flinkApp status, It will reflect the changed status on the
flinkApp resource.
*
- * @param flinkApp the target flinkDeployment resource
+ * @param cr the target custom resource
* @param context the context with which the operation is executed
*/
- void observe(FlinkDeployment flinkApp, Context context);
+ void observe(CR cr, Context context);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
index 6d4ff34..74524c0 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
@@ -24,9 +24,9 @@ import lombok.Value;
/** Result of a fetch savepoint operation. */
@Value
public class SavepointFetchResult {
- private final Savepoint savepoint;
- private final boolean isTriggered;
- private final String error;
+ Savepoint savepoint;
+ boolean isTriggered;
+ String error;
public static SavepointFetchResult notTriggered() {
return new SavepointFetchResult(null, false, null);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
similarity index 92%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index 4afc227..e5d1bb9 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -1,11 +1,10 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -16,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.observer.deployment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
@@ -25,8 +24,10 @@ import
org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -46,7 +47,7 @@ import java.util.List;
import java.util.Optional;
/** The base observer. */
-public abstract class BaseObserver implements Observer {
+public abstract class AbstractDeploymentObserver implements
Observer<FlinkDeployment> {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -56,7 +57,7 @@ public abstract class BaseObserver implements Observer {
protected final FlinkOperatorConfiguration operatorConfiguration;
protected final Configuration flinkConfig;
- public BaseObserver(
+ public AbstractDeploymentObserver(
FlinkService flinkService,
FlinkOperatorConfiguration operatorConfiguration,
Configuration flinkConfig) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
similarity index 89%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index 1c60a87..1dd54ea 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -1,11 +1,10 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -16,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.observer.deployment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
@@ -24,6 +23,7 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -38,9 +38,9 @@ import java.util.List;
import java.util.concurrent.TimeoutException;
/** The observer of {@link
org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
-public class JobObserver extends BaseObserver {
+public class ApplicationObserver extends AbstractDeploymentObserver {
- public JobObserver(
+ public ApplicationObserver(
FlinkService flinkService,
FlinkOperatorConfiguration operatorConfiguration,
Configuration flinkConfig) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
similarity index 70%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
index a0a3566..be2d0f8 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
@@ -1,11 +1,10 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -16,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.observer.deployment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import java.util.Map;
@@ -33,7 +33,7 @@ public class ObserverFactory {
private final FlinkService flinkService;
private final FlinkOperatorConfiguration operatorConfig;
private final Configuration flinkConfig;
- private final Map<Mode, Observer> observerMap;
+ private final Map<Mode, Observer<FlinkDeployment>> observerMap;
public ObserverFactory(
FlinkService flinkService,
@@ -45,7 +45,7 @@ public class ObserverFactory {
this.observerMap = new ConcurrentHashMap<>();
}
- public Observer getOrCreate(FlinkDeployment flinkApp) {
+ public Observer<FlinkDeployment> getOrCreate(FlinkDeployment flinkApp) {
return observerMap.computeIfAbsent(
Mode.getMode(flinkApp),
mode -> {
@@ -53,7 +53,8 @@ public class ObserverFactory {
case SESSION:
return new SessionObserver(flinkService,
operatorConfig, flinkConfig);
case APPLICATION:
- return new JobObserver(flinkService,
operatorConfig, flinkConfig);
+ return new ApplicationObserver(
+ flinkService, operatorConfig, flinkConfig);
default:
throw new UnsupportedOperationException(
String.format("Unsupported running mode:
%s", mode));
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
similarity index 76%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index f74af3b..7625c9c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -1,11 +1,10 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -16,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.observer.deployment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
@@ -28,7 +27,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.concurrent.TimeoutException;
/** The observer of the {@link
org.apache.flink.kubernetes.operator.config.Mode#SESSION} cluster. */
-public class SessionObserver extends BaseObserver {
+public class SessionObserver extends AbstractDeploymentObserver {
public SessionObserver(
FlinkService flinkService,
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
similarity index 63%
copy from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
copy to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 1f7e16e..991323c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -15,20 +15,15 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.observer.sessionjob;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.observer.Observer;
import io.javaoperatorsdk.operator.api.reconciler.Context;
-/** The Observer of {@link FlinkDeployment}. */
-public interface Observer {
-
- /**
- * Observe the flinkApp status, It will reflect the changed status on the
flinkApp resource.
- *
- * @param flinkApp the target flinkDeployment resource
- * @param context the context with which the operation is executed
- */
- void observe(FlinkDeployment flinkApp, Context context);
+/** The observer of {@link FlinkSessionJob}. */
+public class SessionJobObserver implements Observer<FlinkSessionJob> {
+ @Override
+ public void observe(FlinkSessionJob flinkSessionJob, Context context) {}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
index 2364caf..d1e2f10 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
@@ -19,31 +19,34 @@
package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-/** The interface of reconciler. */
-public interface Reconciler {
+/**
+ * The interface of reconciler.
+ *
+ * @param <CR> The custom resource to be reconciled.
+ */
+public interface Reconciler<CR> {
/**
- * This is called when receiving the create or update event of the
FlinkDeployment resource.
+ * This is called when receiving the create or update event of the custom
resource.
*
- * @param flinkApp the FlinkDeployment resource that has been created or
updated
+ * @param cr the custom resource that has been created or updated
* @param context the context with which the operation is executed
- * @param effectiveConfig the effective config of the flinkApp
+ * @param effectiveConfig the effective config of the target resource
*/
- void reconcile(FlinkDeployment flinkApp, Context context, Configuration
effectiveConfig)
- throws Exception;
+ void reconcile(CR cr, Context context, Configuration effectiveConfig)
throws Exception;
/**
- * This is called when receiving the delete event of FlinkDeployment
resource. This method is
- * meant to cleanup the associated components like the Flink job
components.
+ * This is called when receiving the delete event of custom resource. This
method is meant to
+ * cleanup the associated components like the Flink job components.
*
- * @param flinkApp the FlinkDeployment resource that has been deleted
+ * @param cr the custom resource that has been deleted
+ * @param context the context with which the operation is executed
* @param effectiveConfig the effective config of the flinkApp
- * @return DeleteControl to manage the delete behavior
+ * @return DeleteControl to manage the deletion behavior
*/
- DeleteControl cleanup(FlinkDeployment flinkApp, Configuration
effectiveConfig);
+ DeleteControl cleanup(CR cr, Context context, Configuration
effectiveConfig);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 8fd27f2..7682471 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -20,15 +20,19 @@ package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import
org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobReconciliationStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import java.time.Duration;
@@ -74,6 +78,22 @@ public class ReconciliationUtils {
reconciliationStatus.setError(err);
}
+ public static void updateForSpecReconciliationSuccess(FlinkSessionJob
sessionJob) {
+ FlinkSessionJobReconciliationStatus reconciliationStatus =
+ sessionJob.getStatus().getReconciliationStatus();
+ reconciliationStatus.setSuccess(true);
+ reconciliationStatus.setError(null);
+ FlinkSessionJobSpec clonedSpec = clone(sessionJob.getSpec());
+ reconciliationStatus.setLastReconciledSpec(clonedSpec);
+ }
+
+ public static void updateForReconciliationError(FlinkSessionJob
flinkSessionJob, String err) {
+ FlinkSessionJobReconciliationStatus reconciliationStatus =
+ flinkSessionJob.getStatus().getReconciliationStatus();
+ reconciliationStatus.setSuccess(false);
+ reconciliationStatus.setError(err);
+ }
+
public static <T> T clone(T object) {
if (object == null) {
return null;
@@ -87,12 +107,10 @@ public class ReconciliationUtils {
}
}
- public static UpdateControl<FlinkDeployment> toUpdateControl(
- FlinkOperatorConfiguration operatorConfiguration,
- FlinkDeployment originalCopy,
- FlinkDeployment current,
- boolean reschedule) {
- UpdateControl<FlinkDeployment> updateControl;
+ public static <CR extends CustomResource> UpdateControl<CR>
toUpdateControl(
+ CR originalCopy, CR current) {
+
+ UpdateControl<CR> updateControl;
if (!Objects.equals(originalCopy.getSpec(), current.getSpec())) {
throw new UnsupportedOperationException(
"Detected spec change after reconcile, this probably
indicates a bug.");
@@ -106,6 +124,16 @@ public class ReconciliationUtils {
updateControl = UpdateControl.noUpdate();
}
+ return updateControl;
+ }
+
+ public static UpdateControl<FlinkDeployment> toUpdateControl(
+ FlinkOperatorConfiguration operatorConfiguration,
+ FlinkDeployment originalCopy,
+ FlinkDeployment current,
+ boolean reschedule) {
+ UpdateControl<FlinkDeployment> updateControl =
toUpdateControl(originalCopy, current);
+
if (!reschedule) {
return updateControl;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
similarity index 82%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
index 21d6cf1..c0aced3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
@@ -15,26 +15,28 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
/** BaseReconciler with functionality that is common to job and session modes.
*/
-public abstract class BaseReconciler implements Reconciler {
+public abstract class AbstractDeploymentReconciler implements
Reconciler<FlinkDeployment> {
protected final FlinkOperatorConfiguration operatorConfiguration;
protected final KubernetesClient kubernetesClient;
protected final FlinkService flinkService;
- public BaseReconciler(
+ public AbstractDeploymentReconciler(
KubernetesClient kubernetesClient,
FlinkService flinkService,
FlinkOperatorConfiguration operatorConfiguration) {
@@ -44,7 +46,8 @@ public abstract class BaseReconciler implements Reconciler {
}
@Override
- public DeleteControl cleanup(FlinkDeployment flinkApp, Configuration
effectiveConfig) {
+ public DeleteControl cleanup(
+ FlinkDeployment flinkApp, Context context, Configuration
effectiveConfig) {
return shutdownAndDelete(flinkApp, effectiveConfig);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
similarity index 95%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 55ed5ee..b49e18f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
@@ -26,9 +26,10 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
-import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -43,17 +44,17 @@ import org.slf4j.LoggerFactory;
import java.util.Optional;
-import static
org.apache.flink.kubernetes.operator.observer.BaseObserver.JOB_STATE_UNKNOWN;
+import static
org.apache.flink.kubernetes.operator.observer.deployment.AbstractDeploymentObserver.JOB_STATE_UNKNOWN;
/**
* Reconciler responsible for handling the job lifecycle according to the
desired and current
* states.
*/
-public class JobReconciler extends BaseReconciler {
+public class ApplicationReconciler extends AbstractDeploymentReconciler {
- private static final Logger LOG =
LoggerFactory.getLogger(JobReconciler.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ApplicationReconciler.class);
- public JobReconciler(
+ public ApplicationReconciler(
KubernetesClient kubernetesClient,
FlinkService flinkService,
FlinkOperatorConfiguration operatorConfiguration) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
similarity index 73%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index bdcd8e4..d2f55af 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -1,11 +1,10 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -16,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -34,7 +34,7 @@ public class ReconcilerFactory {
private final KubernetesClient kubernetesClient;
private final FlinkService flinkService;
private final FlinkOperatorConfiguration operatorConfiguration;
- private final Map<Mode, Reconciler> reconcilerMap;
+ private final Map<Mode, Reconciler<FlinkDeployment>> reconcilerMap;
public ReconcilerFactory(
KubernetesClient kubernetesClient,
@@ -46,7 +46,7 @@ public class ReconcilerFactory {
this.reconcilerMap = new ConcurrentHashMap<>();
}
- public Reconciler getOrCreate(FlinkDeployment flinkApp) {
+ public Reconciler<FlinkDeployment> getOrCreate(FlinkDeployment flinkApp) {
return reconcilerMap.computeIfAbsent(
Mode.getMode(flinkApp),
mode -> {
@@ -55,7 +55,7 @@ public class ReconcilerFactory {
return new SessionReconciler(
kubernetesClient, flinkService,
operatorConfiguration);
case APPLICATION:
- return new JobReconciler(
+ return new ApplicationReconciler(
kubernetesClient, flinkService,
operatorConfiguration);
default:
throw new UnsupportedOperationException(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
similarity index 92%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 525efd6..98f2683 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -34,7 +35,7 @@ import org.slf4j.LoggerFactory;
* Reconciler responsible for handling the session cluster lifecycle according
to the desired and
* current states.
*/
-public class SessionReconciler extends BaseReconciler {
+public class SessionReconciler extends AbstractDeploymentReconciler {
private static final Logger LOG =
LoggerFactory.getLogger(SessionReconciler.class);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
new file mode 100644
index 0000000..9bb5e4a
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ApplicationReconciler.class);
+
+ private final FlinkOperatorConfiguration operatorConfiguration;
+ private final KubernetesClient kubernetesClient;
+ private final FlinkService flinkService;
+
+ public FlinkSessionJobReconciler(
+ KubernetesClient kubernetesClient,
+ FlinkService flinkService,
+ FlinkOperatorConfiguration operatorConfiguration) {
+ this.kubernetesClient = kubernetesClient;
+ this.flinkService = flinkService;
+ this.operatorConfiguration = operatorConfiguration;
+ }
+
+ @Override
+ public void reconcile(
+ FlinkSessionJob flinkSessionJob, Context context, Configuration
defaultConfig)
+ throws Exception {
+
+ FlinkSessionJobSpec lastReconciledSpec =
+
flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+ if (lastReconciledSpec == null) {
+ submitFlinkJob(flinkSessionJob, context, defaultConfig);
+ return;
+ }
+
+ boolean specChanged =
!flinkSessionJob.getSpec().equals(lastReconciledSpec);
+
+ if (specChanged) {
+ // TODO reconcile other spec change.
+ LOG.info("Other spec change have not supported");
+ }
+ }
+
+ @Override
+ public DeleteControl cleanup(
+ FlinkSessionJob sessionJob, Context context, Configuration
defaultConfig) {
+ Optional<FlinkDeployment> flinkDepOptional =
+ OperatorUtils.getSecondaryResource(sessionJob, context,
operatorConfiguration);
+
+ if (flinkDepOptional.isPresent()) {
+ Configuration effectiveConfig =
+ FlinkUtils.getEffectiveConfig(flinkDepOptional.get(),
defaultConfig);
+ String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+ if (jobID != null) {
+ try {
+ flinkService.cancelSessionJob(JobID.fromHexString(jobID),
effectiveConfig);
+ } catch (Exception e) {
+ LOG.error("Failed to cancel job.", e);
+ }
+ }
+ } else {
+ LOG.info("Session cluster deployment not available");
+ }
+ return DeleteControl.defaultDelete();
+ }
+
+ private void submitFlinkJob(
+ FlinkSessionJob sessionJob, Context context, Configuration
defaultConfig)
+ throws Exception {
+ Optional<FlinkDeployment> flinkDepOptional =
+ OperatorUtils.getSecondaryResource(sessionJob, context,
operatorConfiguration);
+ if (flinkDepOptional.isPresent()) {
+ var flinkdep = flinkDepOptional.get();
+ var jobDeploymentStatus =
flinkdep.getStatus().getJobManagerDeploymentStatus();
+ if (jobDeploymentStatus == JobManagerDeploymentStatus.READY) {
+ Configuration effectiveConfig =
+ FlinkUtils.getEffectiveConfig(flinkdep, defaultConfig);
+ flinkService.submitJobToSessionCluster(sessionJob,
effectiveConfig);
+
ReconciliationUtils.updateForSpecReconciliationSuccess(sessionJob);
+ } else {
+ LOG.info(
+ "Session cluster deployment is in {} status, not ready
for serve",
+ jobDeploymentStatus);
+ }
+ } else {
+ LOG.info("Session cluster deployment is not found");
+ }
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 2740c53..7cf22e0 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -33,17 +33,23 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.operator.artifact.JarResolver;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.FileUpload;
+import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
@@ -52,9 +58,18 @@ import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMess
import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.util.RestConstants;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunResponseBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -70,9 +85,14 @@ import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
+import java.nio.file.Path;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -83,11 +103,17 @@ public class FlinkService {
private final KubernetesClient kubernetesClient;
private final FlinkOperatorConfiguration operatorConfiguration;
+ private final JarResolver jarResolver;
+ private final ExecutorService executorService;
public FlinkService(
KubernetesClient kubernetesClient, FlinkOperatorConfiguration
operatorConfiguration) {
this.kubernetesClient = kubernetesClient;
this.operatorConfiguration = operatorConfiguration;
+ this.jarResolver = new JarResolver();
+ this.executorService =
+ Executors.newFixedThreadPool(
+ 4, new
ExecutorThreadFactory("Flink-RestClusterClient-IO"));
}
public void submitApplicationCluster(FlinkDeployment deployment,
Configuration conf)
@@ -132,6 +158,77 @@ public class FlinkService {
LOG.info("Session cluster successfully deployed");
}
+ public void submitJobToSessionCluster(FlinkSessionJob sessionJob,
Configuration conf)
+ throws Exception {
+ var jarRunResponseBody = jarRun(sessionJob, jarUpload(sessionJob,
conf), conf);
+ String jobID = jarRunResponseBody.getJobId().toHexString();
+ LOG.info("Submitted job: {} to session cluster.", jobID);
+
sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+ }
+
+ private JarRunResponseBody jarRun(
+ FlinkSessionJob sessionJob, JarUploadResponseBody response,
Configuration conf) {
+ String jarId =
+
response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+ // we generate jobID in advance to help deduplicate job submission.
+ JobID jobID = new JobID();
+ try (RestClusterClient<String> clusterClient =
+ (RestClusterClient<String>) getClusterClient(conf)) {
+ JarRunHeaders headers = JarRunHeaders.getInstance();
+ JarRunMessageParameters parameters =
headers.getUnresolvedMessageParameters();
+ parameters.jarIdPathParameter.resolve(jarId);
+ JobSpec job = sessionJob.getSpec().getJob();
+ JarRunRequestBody runRequestBody =
+ new JarRunRequestBody(
+ job.getEntryClass(),
+ null,
+ job.getArgs() == null ? null :
Arrays.asList(job.getArgs()),
+ job.getParallelism() > 0 ? job.getParallelism() :
null,
+ jobID,
+ null,
+
sessionJob.getSpec().getJob().getInitialSavepointPath());
+ LOG.info("Submitting job: {} to session cluster.",
jobID.toHexString());
+ return clusterClient
+ .sendRequest(headers, parameters, runRequestBody)
+ .get(
+
operatorConfiguration.getFlinkClientTimeout().toSeconds(),
+ TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to submit job to session cluster.", e);
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ private JarUploadResponseBody jarUpload(FlinkSessionJob sessionJob,
Configuration conf)
+ throws Exception {
+ Path path =
jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+ JarUploadHeaders headers = JarUploadHeaders.getInstance();
+ String clusterId = sessionJob.getSpec().getClusterId();
+ String namespace = sessionJob.getMetadata().getNamespace();
+ int port = conf.getInteger(RestOptions.PORT);
+ String host =
+ ObjectUtils.firstNonNull(
+ operatorConfiguration.getFlinkServiceHostOverride(),
+
ExternalServiceDecorator.getNamespacedExternalServiceName(
+ clusterId, namespace));
+
+ try (RestClient restClient = new RestClient(conf, executorService)) {
+ // TODO add method in flink#RestClusterClient to support upload
jar.
+ return restClient
+ .sendRequest(
+ host,
+ port,
+ headers,
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.singletonList(
+ new FileUpload(path,
RestConstants.CONTENT_TYPE_JAR)))
+ .get(
+
operatorConfiguration.getFlinkClientTimeout().toSeconds(),
+ TimeUnit.SECONDS);
+ }
+ }
+
public boolean isJobManagerPortReady(Configuration config) {
final URI uri;
try (ClusterClient<String> clusterClient = getClusterClient(config)) {
@@ -184,7 +281,11 @@ public class FlinkService {
final String clusterId = clusterClient.getClusterId();
switch (upgradeMode) {
case STATELESS:
- clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
+ clusterClient
+ .cancel(jobID)
+ .get(
+
operatorConfiguration.getCancelJobTimeout().toSeconds(),
+ TimeUnit.SECONDS);
break;
case SAVEPOINT:
final String savepointDirectory =
@@ -222,6 +323,14 @@ public class FlinkService {
return savepointOpt;
}
+ public void cancelSessionJob(JobID jobID, Configuration conf) throws
Exception {
+ try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+ clusterClient
+ .cancel(jobID)
+
.get(operatorConfiguration.getCancelJobTimeout().toSeconds(), TimeUnit.SECONDS);
+ }
+ }
+
public void stopSessionCluster(
FlinkDeployment deployment, Configuration conf, boolean
deleteHaData) {
FlinkUtils.deleteCluster(deployment, kubernetesClient, deleteHaData);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
index 28dc8fc..4b7cb60 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
@@ -18,6 +18,9 @@
package org.apache.flink.kubernetes.operator.utils;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.utils.Constants;
import io.fabric8.kubernetes.api.model.HasMetadata;
@@ -26,6 +29,7 @@ import io.fabric8.kubernetes.api.model.apps.DeploymentList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
import
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
import org.apache.commons.lang3.StringUtils;
@@ -33,6 +37,7 @@ import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
/** Operator SDK related utility functions. */
@@ -79,4 +84,15 @@ public class OperatorUtils {
return new
HashSet<>(Arrays.asList(watchedNamespaces.split(NAMESPACES_SPLITTER_KEY)));
}
}
+
+ public static Optional<FlinkDeployment> getSecondaryResource(
+ FlinkSessionJob sessionJob,
+ Context context,
+ FlinkOperatorConfiguration operatorConfiguration) {
+ var identifier =
+ operatorConfiguration.getWatchedNamespaces().size() >= 1
+ ? sessionJob.getMetadata().getNamespace()
+ : null;
+ return context.getSecondaryResource(FlinkDeployment.class, identifier);
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
similarity index 86%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 828e539..3efb504 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
@@ -31,8 +32,8 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -43,8 +44,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
-/** Default validator implementation. */
-public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
+/** Default validator implementation for {@link FlinkDeployment}. */
+public class DefaultValidator implements FlinkResourceValidator {
private static final String[] FORBIDDEN_CONF_KEYS =
new String[] {
@@ -55,7 +56,7 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
Set.of(Constants.CONFIG_FILE_LOG4J_NAME,
Constants.CONFIG_FILE_LOGBACK_NAME);
@Override
- public Optional<String> validate(FlinkDeployment deployment) {
+ public Optional<String> validateDeployment(FlinkDeployment deployment) {
FlinkDeploymentSpec spec = deployment.getSpec();
return firstPresent(
validateFlinkVersion(spec.getFlinkVersion()),
@@ -270,4 +271,42 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
return Optional.empty();
}
+
+ // validate session job
+
+ @Override
+ public Optional<String> validateSessionJob(
+ FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+
+ return firstPresent(
+ validateNotApplicationCluster(session),
+ validateSessionClusterId(sessionJob, session));
+ }
+
+ private Optional<String> validateSessionClusterId(
+ FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+ return session.flatMap(
+ deployment -> {
+ if (!deployment
+ .getMetadata()
+ .getName()
+ .equals(sessionJob.getSpec().getClusterId())) {
+ return Optional.of(
+ "The session job's cluster id is not match
with the session cluster");
+ } else {
+ return Optional.empty();
+ }
+ });
+ }
+
+ private Optional<String>
validateNotApplicationCluster(Optional<FlinkDeployment> session) {
+ return session.flatMap(
+ deployment -> {
+ if (deployment.getSpec().getJob() != null) {
+ return Optional.of("Can not submit to application
cluster");
+ } else {
+ return Optional.empty();
+ }
+ });
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java
similarity index 64%
copy from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
copy to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java
index 732224d..c5c13c1 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java
@@ -18,11 +18,12 @@
package org.apache.flink.kubernetes.operator.validation;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import java.util.Optional;
-/** Validator for {@link FlinkDeployment} resources. */
-public interface FlinkDeploymentValidator {
+/** Validator for different resources. */
+public interface FlinkResourceValidator {
/**
* Validate and return optional error.
@@ -30,5 +31,15 @@ public interface FlinkDeploymentValidator {
* @param deployment
* @return Optional error string, should be present iff validation
resulted in an error
*/
- Optional<String> validate(FlinkDeployment deployment);
+ Optional<String> validateDeployment(FlinkDeployment deployment);
+
+ /**
+ * Validate and return optional error.
+ *
+ * @param sessionJob the session job to be validated.
+ * @param session the target session cluster of the session job to be
validated.
+ * @return Optional error string, should be present iff validation
resulted in an error
+ */
+ Optional<String> validateSessionJob(
+ FlinkSessionJob sessionJob, Optional<FlinkDeployment> session);
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index fde3bfd..929afb1 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -21,7 +21,9 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
@@ -30,6 +32,8 @@ import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import io.fabric8.kubernetes.api.model.Container;
@@ -59,6 +63,7 @@ public class TestUtils {
public static final String TEST_NAMESPACE = "flink-operator-test";
public static final String TEST_DEPLOYMENT_NAME = "test-cluster";
+ public static final String TEST_SESSION_JOB_NAME = "test-session-job";
public static final String SERVICE_ACCOUNT = "flink-operator";
public static final String FLINK_VERSION = "latest";
public static final String IMAGE = String.format("flink:%s",
FLINK_VERSION);
@@ -91,6 +96,28 @@ public class TestUtils {
return deployment;
}
+ public static FlinkSessionJob buildSessionJob() {
+ FlinkSessionJob sessionJob = new FlinkSessionJob();
+ sessionJob.setStatus(new FlinkSessionJobStatus());
+ sessionJob.setMetadata(
+ new ObjectMetaBuilder()
+ .withName(TEST_SESSION_JOB_NAME)
+ .withNamespace(TEST_NAMESPACE)
+ .build());
+ sessionJob.setSpec(
+ FlinkSessionJobSpec.builder()
+ .clusterId(TEST_DEPLOYMENT_NAME)
+ .job(
+ JobSpec.builder()
+ .jarURI(SAMPLE_JAR)
+ .parallelism(1)
+ .upgradeMode(UpgradeMode.STATELESS)
+ .state(JobState.RUNNING)
+ .build())
+ .build());
+ return sessionJob;
+ }
+
public static FlinkDeploymentSpec getTestFlinkDeploymentSpec() {
Map<String, String> conf = new HashMap<>();
conf.put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
@@ -195,6 +222,41 @@ public class TestUtils {
};
}
+ public static Context createContextWithReadyFlinkDeployment() {
+ return new Context() {
+ @Override
+ public Optional<RetryInfo> getRetryInfo() {
+ return Optional.empty();
+ }
+
+ @Override
+ public <T> Optional<T> getSecondaryResource(
+ Class<T> expectedType, String eventSourceName) {
+ var session = buildSessionCluster();
+
session.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+ return Optional.of((T) session);
+ }
+ };
+ }
+
+ public static Context createContextWithNotReadyFlinkDeployment() {
+ return new Context() {
+ @Override
+ public Optional<RetryInfo> getRetryInfo() {
+ return Optional.empty();
+ }
+
+ @Override
+ public <T> Optional<T> getSecondaryResource(
+ Class<T> expectedType, String eventSourceName) {
+ var session = buildSessionCluster();
+ session.getStatus()
+
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+ return Optional.of((T) session);
+ }
+ };
+ }
+
public static final String DEPLOYMENT_ERROR = "test deployment error
message";
public static Context createContextWithFailedJobManagerDeployment() {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 214bd03..b8721e0 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
@@ -81,6 +82,18 @@ public class TestingFlinkService extends FlinkService {
}
@Override
+ public void submitJobToSessionCluster(FlinkSessionJob sessionJob,
Configuration conf) {
+ JobID jobID = new JobID();
+ JobStatusMessage jobStatusMessage =
+ new JobStatusMessage(
+ jobID,
+ sessionJob.getMetadata().getName(),
+ JobStatus.RUNNING,
+ System.currentTimeMillis());
+ jobs.add(Tuple2.of(conf.get(SavepointConfigOptions.SAVEPOINT_PATH),
jobStatusMessage));
+ }
+
+ @Override
public List<JobStatusMessage> listJobs(Configuration conf) throws
Exception {
listJobConsumer.accept(conf);
if (jobs.isEmpty() && !sessions.isEmpty()) {
@@ -121,6 +134,13 @@ public class TestingFlinkService extends FlinkService {
}
@Override
+ public void cancelSessionJob(JobID jobID, Configuration conf) throws
Exception {
+ if (!jobs.removeIf(js -> js.f1.getJobId().equals(jobID))) {
+ throw new Exception("Job not found");
+ }
+ }
+
+ @Override
public void stopSessionCluster(
FlinkDeployment deployment, Configuration conf, boolean deleteHa) {
sessions.remove(deployment.getMetadata().getName());
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/JarResolverTest.java
similarity index 60%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
rename to
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/JarResolverTest.java
index 732224d..694cf84 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/JarResolverTest.java
@@ -15,20 +15,21 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.validation;
+package org.apache.flink.kubernetes.operator.artifact;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-import java.util.Optional;
+import java.nio.file.Path;
-/** Validator for {@link FlinkDeployment} resources. */
-public interface FlinkDeploymentValidator {
+/** Test for {@link JarResolver}. */
+public class JarResolverTest {
- /**
- * Validate and return optional error.
- *
- * @param deployment
- * @return Optional error string, should be present iff validation
resulted in an error
- */
- Optional<String> validate(FlinkDeployment deployment);
+ @Test
+ public void testResolve() throws Exception {
+ String jarUri = "file:///opt/flink/test.jar";
+ JarResolver resolver = new JarResolver();
+ Path path = resolver.resolve(jarUri);
+ Assertions.assertEquals("/opt/flink/test.jar",
path.toAbsolutePath().toString());
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 3b93713..3b358ef 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -28,16 +28,16 @@ import
org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
-import org.apache.flink.kubernetes.operator.observer.BaseObserver;
-import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
-import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
+import
org.apache.flink.kubernetes.operator.observer.deployment.AbstractDeploymentObserver;
+import
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.api.model.EventBuilder;
@@ -461,7 +461,7 @@ public class FlinkDeploymentControllerTest {
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
// jobStatus has not been set at this time
- assertEquals(BaseObserver.JOB_STATE_UNKNOWN, jobStatus.getState());
+ assertEquals(AbstractDeploymentObserver.JOB_STATE_UNKNOWN,
jobStatus.getState());
// Switches operator mode to SESSION
appCluster.getSpec().setJob(null);
@@ -607,8 +607,7 @@ public class FlinkDeploymentControllerTest {
defaultConfig,
operatorConfiguration,
kubernetesClient,
- "test",
- new DefaultDeploymentValidator(),
+ new DefaultValidator(),
new ReconcilerFactory(
kubernetesClient, flinkService,
operatorConfiguration),
new ObserverFactory(
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ApplicationObserverTest.java
similarity index 93%
rename from
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
rename to
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ApplicationObserverTest.java
index 8ea1063..4564e3d 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ApplicationObserverTest.java
@@ -24,8 +24,11 @@ import
org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import
org.apache.flink.kubernetes.operator.observer.deployment.AbstractDeploymentObserver;
+import
org.apache.flink.kubernetes.operator.observer.deployment.ApplicationObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -37,8 +40,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-/** {@link JobObserver} unit tests. */
-public class JobObserverTest {
+/** {@link ApplicationObserver} unit tests. */
+public class ApplicationObserverTest {
private final Context readyContext =
TestUtils.createContextWithReadyJobManagerDeployment();
@@ -46,8 +49,8 @@ public class JobObserverTest {
public void observeApplicationCluster() {
Configuration flinkConf = new Configuration();
TestingFlinkService flinkService = new TestingFlinkService();
- JobObserver observer =
- new JobObserver(
+ ApplicationObserver observer =
+ new ApplicationObserver(
flinkService,
FlinkOperatorConfiguration.fromConfiguration(new
Configuration()),
flinkConf);
@@ -134,15 +137,16 @@ public class JobObserverTest {
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- BaseObserver.JOB_STATE_UNKNOWN,
deployment.getStatus().getJobStatus().getState());
+ AbstractDeploymentObserver.JOB_STATE_UNKNOWN,
+ deployment.getStatus().getJobStatus().getState());
}
@Test
public void observeSavepoint() throws Exception {
Configuration flinkConf = new Configuration();
TestingFlinkService flinkService = new TestingFlinkService();
- JobObserver observer =
- new JobObserver(
+ ApplicationObserver observer =
+ new ApplicationObserver(
flinkService,
FlinkOperatorConfiguration.fromConfiguration(new
Configuration()),
flinkConf);
@@ -205,8 +209,8 @@ public class JobObserverTest {
public void observeListJobsError() {
Configuration flinkConf = new Configuration();
TestingFlinkService flinkService = new TestingFlinkService();
- JobObserver observer =
- new JobObserver(
+ ApplicationObserver observer =
+ new ApplicationObserver(
flinkService,
FlinkOperatorConfiguration.fromConfiguration(new
Configuration()),
flinkConf);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
index 003195e..5524f7c 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.observer.deployment.SessionObserver;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
similarity index 93%
rename from
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
rename to
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index dc33b29..82f7f33 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
@@ -26,8 +26,9 @@ import
org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -44,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link JobStatusObserver unit tests */
-public class JobReconcilerTest {
+public class ApplicationReconcilerTest {
private final FlinkOperatorConfiguration operatorConfiguration =
FlinkOperatorConfiguration.fromConfiguration(new Configuration());
@@ -54,7 +55,8 @@ public class JobReconcilerTest {
Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
TestingFlinkService flinkService = new TestingFlinkService();
- JobReconciler reconciler = new JobReconciler(null, flinkService,
operatorConfiguration);
+ ApplicationReconciler reconciler =
+ new ApplicationReconciler(null, flinkService,
operatorConfiguration);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration config = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
@@ -105,8 +107,8 @@ public class JobReconcilerTest {
final Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
final TestingFlinkService flinkService = new TestingFlinkService();
- final JobReconciler reconciler =
- new JobReconciler(null, flinkService, operatorConfiguration);
+ final ApplicationReconciler reconciler =
+ new ApplicationReconciler(null, flinkService,
operatorConfiguration);
final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
final Configuration config = FlinkUtils.getEffectiveConfig(deployment,
new Configuration());
@@ -156,7 +158,8 @@ public class JobReconcilerTest {
public void triggerSavepoint() throws Exception {
Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
TestingFlinkService flinkService = new TestingFlinkService();
- JobReconciler reconciler = new JobReconciler(null, flinkService,
operatorConfiguration);
+ ApplicationReconciler reconciler =
+ new ApplicationReconciler(null, flinkService,
operatorConfiguration);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration config = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
@@ -217,8 +220,8 @@ public class JobReconcilerTest {
final Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
final TestingFlinkService flinkService = new TestingFlinkService();
- final JobReconciler reconciler =
- new JobReconciler(null, flinkService, operatorConfiguration);
+ final ApplicationReconciler reconciler =
+ new ApplicationReconciler(null, flinkService,
operatorConfiguration);
final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
final Configuration config = FlinkUtils.getEffectiveConfig(deployment,
new Configuration());
deployment.getSpec().getFlinkConfiguration().remove(HighAvailabilityOptions.HA_MODE.key());
@@ -274,8 +277,8 @@ public class JobReconcilerTest {
final Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
final TestingFlinkService flinkService = new TestingFlinkService();
- final JobReconciler reconciler =
- new JobReconciler(null, flinkService, operatorConfiguration);
+ final ApplicationReconciler reconciler =
+ new ApplicationReconciler(null, flinkService,
operatorConfiguration);
final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
final Configuration config = FlinkUtils.getEffectiveConfig(deployment,
new Configuration());
@@ -317,7 +320,8 @@ public class JobReconcilerTest {
Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
TestingFlinkService flinkService = new TestingFlinkService();
- JobReconciler reconciler = new JobReconciler(null, flinkService,
operatorConfiguration);
+ ApplicationReconciler reconciler =
+ new ApplicationReconciler(null, flinkService,
operatorConfiguration);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration config = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
similarity index 93%
rename from
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconcilerTest.java
rename to
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 61b1f1c..9a975e1 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
@@ -30,7 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
-/** Tests for {@link SessionReconciler}. */
+/**
+ * Tests for {@link
org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler}.
+ */
public class SessionReconcilerTest {
private final FlinkOperatorConfiguration operatorConfiguration =
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
new file mode 100644
index 0000000..969c5fd
--- /dev/null
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link FlinkSessionJobReconciler}. */
+public class FlinkSessionJobReconcilerTest {
+
+ private final FlinkOperatorConfiguration operatorConfiguration =
+ FlinkOperatorConfiguration.fromConfiguration(new Configuration());
+
+ @Test
+ public void testSubmitAndCleanUp() throws Exception {
+ TestingFlinkService flinkService = new TestingFlinkService();
+ FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+ FlinkSessionJobReconciler reconciler =
+ new FlinkSessionJobReconciler(null, flinkService,
operatorConfiguration);
+ reconciler.reconcile(sessionJob, TestUtils.createEmptyContext(), new
Configuration());
+ Assertions.assertEquals(0, flinkService.listJobs().size());
+ reconciler.reconcile(
+ sessionJob,
+ TestUtils.createContextWithNotReadyFlinkDeployment(),
+ new Configuration());
+ Assertions.assertEquals(0, flinkService.listJobs().size());
+ reconciler.reconcile(
+ sessionJob, TestUtils.createContextWithReadyFlinkDeployment(),
new Configuration());
+ Assertions.assertEquals(1, flinkService.listJobs().size());
+ // clean up
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+
.setJobId(flinkService.listJobs().get(0).f1.getJobId().toHexString());
+ reconciler.cleanup(
+ sessionJob, TestUtils.createContextWithReadyFlinkDeployment(),
new Configuration());
+ Assertions.assertEquals(0, flinkService.listJobs().size());
+ }
+}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 76bd26a..cf4507f 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
-import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
similarity index 80%
rename from
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
rename to
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 5f6a896..9c54763 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -22,19 +22,21 @@ import
org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
-import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.utils.Constants;
-import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
@@ -43,14 +45,15 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
+import java.util.function.Supplier;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
/** Test deployment validation logic. */
-public class DeploymentValidatorTest {
+public class DefaultValidatorTest {
- private final DefaultDeploymentValidator validator = new
DefaultDeploymentValidator();
+ private final DefaultValidator validator = new DefaultValidator();
@Test
public void testValidation() {
@@ -279,18 +282,76 @@ public class DeploymentValidatorTest {
testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_15));
}
+ @Test
+ public void testSessionJobWithSession() {
+ testSessionJobValidateSuccess(job -> {}, session -> {});
+
+ testSessionJobValidateError(
+ sessionJob -> sessionJob.getSpec().setClusterId("not-match"),
+ deployment -> {},
+ "The session job's cluster id is not match with the session
cluster");
+
+ testSessionJobValidateError(
+ job -> {},
+ deployment -> deployment.getSpec().setJob(new JobSpec()),
+ "Can not submit to application cluster");
+ }
+
private void testSuccess(Consumer<FlinkDeployment> deploymentModifier) {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
deploymentModifier.accept(deployment);
- validator.validate(deployment).ifPresent(Assert::fail);
+ validator.validateDeployment(deployment).ifPresent(Assertions::fail);
}
private void testError(Consumer<FlinkDeployment> deploymentModifier,
String expectedErr) {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
deploymentModifier.accept(deployment);
- Optional<String> error = validator.validate(deployment);
+ Optional<String> error = validator.validateDeployment(deployment);
+ if (error.isPresent()) {
+ assertTrue(error.get().startsWith(expectedErr), error.get());
+ } else {
+ fail("Did not get expected error: " + expectedErr);
+ }
+ }
+
+ private void testSessionJobValidateSuccess(
+ Consumer<FlinkSessionJob> sessionJobModifier,
+ Consumer<FlinkDeployment> sessionModifier) {
+ FlinkDeployment session = TestUtils.buildSessionCluster();
+ FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+ sessionModifier.accept(session);
+ sessionJobModifier.accept(sessionJob);
+ validator.validateSessionJob(sessionJob,
Optional.of(session)).ifPresent(Assertions::fail);
+ }
+
+ private void testSessionJobValidateError(
+ Consumer<FlinkSessionJob> sessionJobModifier,
+ Consumer<FlinkDeployment> sessionModifier,
+ String expectedErr) {
+ FlinkDeployment session = TestUtils.buildSessionCluster();
+ FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+ sessionModifier.accept(session);
+ sessionJobModifier.accept(sessionJob);
+ testSessionJobValidateError(sessionJob, Optional.of(session),
expectedErr);
+ }
+
+ private void testSessionJobValidateError(
+ Consumer<FlinkSessionJob> sessionJobModifier,
+ Supplier<Optional<FlinkDeployment>> sessionSupplier,
+ String expectedErr) {
+ FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+ sessionJobModifier.accept(sessionJob);
+ testSessionJobValidateError(sessionJob, sessionSupplier.get(),
expectedErr);
+ }
+
+ private void testSessionJobValidateError(
+ FlinkSessionJob sessionJob, Optional<FlinkDeployment> session,
String expectedErr) {
+ Optional<String> error = validator.validateSessionJob(sessionJob,
session);
if (error.isPresent()) {
- assertTrue(error.get(), error.get().startsWith(expectedErr));
+ assertTrue(error.get().startsWith(expectedErr), error.get());
} else {
fail("Did not get expected error: " + expectedErr);
}
diff --git
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index c534273..ff1db98 100644
---
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -18,7 +18,7 @@
package org.apache.flink.kubernetes.operator.admission;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
-import
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
@@ -56,7 +56,7 @@ public class FlinkOperatorWebhook {
public static void main(String[] args) throws Exception {
LOG.info("Starting Flink Kubernetes Webhook");
AdmissionHandler endpoint =
- new AdmissionHandler(new FlinkValidator(new
DefaultDeploymentValidator()));
+ new AdmissionHandler(new FlinkValidator(new
DefaultValidator()));
ChannelInitializer<SocketChannel> initializer =
createChannelInitializer(endpoint);
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
diff --git
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
index 838e618..e2dec7a 100644
---
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
+++
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
@@ -21,7 +21,7 @@ import
org.apache.flink.kubernetes.operator.admission.admissioncontroller.NotAll
import
org.apache.flink.kubernetes.operator.admission.admissioncontroller.Operation;
import
org.apache.flink.kubernetes.operator.admission.admissioncontroller.validation.Validator;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
@@ -35,9 +35,9 @@ public class FlinkValidator implements
Validator<GenericKubernetesResource> {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkValidator.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
- private final FlinkDeploymentValidator deploymentValidator;
+ private final FlinkResourceValidator deploymentValidator;
- public FlinkValidator(FlinkDeploymentValidator deploymentValidator) {
+ public FlinkValidator(FlinkResourceValidator deploymentValidator) {
this.deploymentValidator = deploymentValidator;
}
@@ -49,7 +49,7 @@ public class FlinkValidator implements
Validator<GenericKubernetesResource> {
FlinkDeployment flinkDeployment =
objectMapper.convertValue(resource, FlinkDeployment.class);
- Optional<String> validationError =
deploymentValidator.validate(flinkDeployment);
+ Optional<String> validationError =
deploymentValidator.validateDeployment(flinkDeployment);
if (validationError.isPresent()) {
throw new NotAllowedException(validationError.get());
}
diff --git
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
index fcf0d21..e41a620 100644
---
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
+++
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.kubernetes.operator.admission;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
@@ -48,7 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class AdmissionHandlerTest {
private final AdmissionHandler admissionHandler =
- new AdmissionHandler(new FlinkValidator(new
DefaultDeploymentValidator()));
+ new AdmissionHandler(new FlinkValidator(new DefaultValidator()));
@Test
public void testHandleIllegalRequest() {
diff --git
a/helm/flink-kubernetes-operator/conf/flink-operator-config/log4j2.properties
b/helm/flink-kubernetes-operator/conf/flink-operator-config/log4j2.properties
index 47b6664..8e3e456 100644
---
a/helm/flink-kubernetes-operator/conf/flink-operator-config/log4j2.properties
+++
b/helm/flink-kubernetes-operator/conf/flink-operator-config/log4j2.properties
@@ -23,4 +23,4 @@ rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan}
%highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]}
%msg%n%throwable}
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan}
%highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]}
%msg%n%throwable}
\ No newline at end of file
diff --git
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index 6ea6bee..475449d 100644
---
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -85,7 +85,7 @@ spec:
type: boolean
error:
type: string
- flinkSessionJobSpec:
+ lastReconciledSpec:
properties:
clusterId:
type: string
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index 305ea4b..64bc5c8 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -74,6 +74,9 @@ spec:
mountPath: /opt/flink-operator/conf
- name: flink-default-config-volume
mountPath: /opt/flink/conf
+ {{- if .Values.operatorVolumeMounts.create }}
+ {{- toYaml .Values.operatorVolumeMounts.data | nindent 12 }}
+ {{- end }}
{{- if .Values.webhook.create }}
- name: flink-webhook
image: "{{ .Values.image.repository }}:{{ .Values.image.tag |
default .Chart.AppVersion }}"
@@ -123,6 +126,9 @@ spec:
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
+ {{- if .Values.operatorVolumes.create }}
+ {{- toYaml .Values.operatorVolumes.data | nindent 8 }}
+ {{- end }}
{{- if .Values.webhook.create }}
- name: keystore
secret:
diff --git a/helm/flink-kubernetes-operator/templates/rbac.yaml
b/helm/flink-kubernetes-operator/templates/rbac.yaml
index 8fa96aa..57ee3f4 100644
--- a/helm/flink-kubernetes-operator/templates/rbac.yaml
+++ b/helm/flink-kubernetes-operator/templates/rbac.yaml
@@ -59,6 +59,8 @@ rules:
resources:
- flinkdeployments
- flinkdeployments/status
+ - flinksessionjobs
+ - flinksessionjobs/status
verbs:
- "*"
- apiGroups:
diff --git a/helm/flink-kubernetes-operator/values.yaml
b/helm/flink-kubernetes-operator/values.yaml
index b634f83..976d61f 100644
--- a/helm/flink-kubernetes-operator/values.yaml
+++ b/helm/flink-kubernetes-operator/values.yaml
@@ -41,6 +41,23 @@ jobServiceAccount:
"helm.sh/resource-policy": keep
name: "flink"
+operatorVolumeMounts:
+ create: false
+ data:
+ - name: flink-artifacts
+ mountPath: /opt/flink/artifacts
+
+operatorVolumes:
+ create: false
+ data:
+ - name: flink-artifacts
+ hostPath:
+ path: /tmp/flink/artifacts
+ type: DirectoryOrCreate
+# - name: flink-artifacts
+# persistentVolumeClaim:
+# claimName: flink-artifacts
+
webhook:
create: true
keystore: