This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 79b169964cd3aa1ea3383b3180a601939fbd3a04 Author: Usamah Jassat <[email protected]> AuthorDate: Tue Jun 21 17:35:08 2022 +0100 [FLINK-27446] Add standalone mode validation and config building --- .github/workflows/ci.yml | 2 + docs/content/docs/custom-resource/reference.md | 11 +++ e2e-tests/data/flinkdep-cr.yaml | 1 + e2e-tests/data/multi-sessionjob.yaml | 2 + e2e-tests/data/sessionjob-cr.yaml | 1 + .../operator/config/FlinkConfigBuilder.java | 72 +++++++++++++++++- .../observer/deployment/ObserverFactory.java | 15 ++-- .../deployment/ApplicationReconciler.java | 3 +- .../reconciler/deployment/ReconcilerFactory.java | 15 ++-- .../operator/service/AbstractFlinkService.java | 43 ++++++++++- .../operator/service/FlinkServiceFactory.java | 7 ++ .../operator/service/NativeFlinkService.java | 39 +++------- .../operator/service/StandaloneFlinkService.java | 36 +++++++-- .../operator/utils/EventSourceUtils.java | 6 +- .../kubernetes/operator/utils/FlinkUtils.java | 4 + .../operator/validation/DefaultValidator.java | 36 ++++++++- .../kubernetes/operator/TestingFlinkService.java | 12 +++ .../operator/config/FlinkConfigBuilderTest.java | 85 ++++++++++++++++++++++ .../TestingFlinkDeploymentController.java | 1 + .../sessionjob/SessionJobObserverTest.java | 3 +- .../sessionjob/SessionJobReconcilerTest.java | 2 +- .../operator/validation/DefaultValidatorTest.java | 69 ++++++++++++++++++ 22 files changed, 400 insertions(+), 65 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1dde3421..b34a0d28 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,6 +97,7 @@ jobs: matrix: version: ["v1_15","v1_14","v1_13"] namespace: ["default","flink"] + mode: ["native", "standalone"] test: - test_application_kubernetes_ha.sh - test_application_operations.sh @@ -155,6 +156,7 @@ jobs: run: | sed -i "s/image: flink:.*/image: ${{ matrix.image }}/" e2e-tests/data/*.yaml sed -i "s/flinkVersion: .*/flinkVersion: ${{ matrix.version }}/" e2e-tests/data/*.yaml + sed -i "s/mode: .*/mode: ${{ matrix.mode }}/" e2e-tests/data/*.yaml git diff HEAD echo "Running e2e-tests/$test" bash e2e-tests/${{ matrix.test }} || exit 1 diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md index 100db9d4..b2399505 100644 --- a/docs/content/docs/custom-resource/reference.md +++ b/docs/content/docs/custom-resource/reference.md @@ -57,6 +57,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit | jobManager | org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec | JobManager specs. | | taskManager | org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec | TaskManager specs. | | logConfiguration | java.util.Map<java.lang.String,java.lang.String> | Log configuration overrides for the Flink deployment. Format logConfigFileName -> configContent. | +| mode | org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode | Deployment mode of the Flink cluster, native or standalone. | ### FlinkSessionJobSpec **Class**: org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec @@ -131,6 +132,16 @@ This page serves as a full reference for FlinkDeployment custom resource definit | RUNNING | Job is expected to be processing data. | | SUSPENDED | Processing is suspended with the intention of continuing later. | +### KubernetesDeploymentMode +**Class**: org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode + +**Description**: Enum to control Flink deployment mode on Kubernetes. + +| Value | Docs | +| ----- | ---- | +| NATIVE | Deploys Flink using Flinks native Kubernetes support. Only supported for newer versions of Flink | +| STANDALONE | Deploys Flink on-top of kubernetes in standalone mode. | + ### Resource **Class**: org.apache.flink.kubernetes.operator.crd.spec.Resource diff --git a/e2e-tests/data/flinkdep-cr.yaml b/e2e-tests/data/flinkdep-cr.yaml index d48af2be..18e73b27 100644 --- a/e2e-tests/data/flinkdep-cr.yaml +++ b/e2e-tests/data/flinkdep-cr.yaml @@ -83,6 +83,7 @@ spec: entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample parallelism: 2 upgradeMode: last-state + mode: native --- apiVersion: v1 diff --git a/e2e-tests/data/multi-sessionjob.yaml b/e2e-tests/data/multi-sessionjob.yaml index ed5c892f..77991953 100644 --- a/e2e-tests/data/multi-sessionjob.yaml +++ b/e2e-tests/data/multi-sessionjob.yaml @@ -65,6 +65,7 @@ spec: resource: memory: "1024m" cpu: 0.25 + mode: native --- apiVersion: flink.apache.org/v1beta1 @@ -116,6 +117,7 @@ spec: resource: memory: "1024m" cpu: 0.25 + mode: native --- apiVersion: flink.apache.org/v1beta1 diff --git a/e2e-tests/data/sessionjob-cr.yaml b/e2e-tests/data/sessionjob-cr.yaml index 53b49f0a..1953cdec 100644 --- a/e2e-tests/data/sessionjob-cr.yaml +++ b/e2e-tests/data/sessionjob-cr.yaml @@ -65,6 +65,7 @@ spec: resource: memory: "1024m" cpu: 0.5 + mode: native --- apiVersion: flink.apache.org/v1beta1 diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java index 8941dd51..706d1108 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.config; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; @@ -32,8 +33,12 @@ import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; +import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; +import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.crd.spec.Resource; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; +import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; @@ -59,6 +64,8 @@ import static org.apache.flink.configuration.DeploymentOptions.SUBMIT_FAILED_JOB import static org.apache.flink.configuration.DeploymentOptionsInternal.CONF_DIR; import static org.apache.flink.configuration.WebOptions.CANCEL_ENABLE; import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE; +import static org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION; +import static org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION; import static org.apache.flink.kubernetes.operator.utils.FlinkUtils.mergePodTemplates; import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME; import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME; @@ -216,30 +223,89 @@ public class FlinkConfigBuilder { spec.getTaskManager().getPodTemplate(), effectiveConfig, false); + + if (spec.getTaskManager().getReplicas() != null + && spec.getTaskManager().getReplicas() > 0) { + effectiveConfig.set( + StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS, + spec.getTaskManager().getReplicas()); + } + } + + if (spec.getJob() != null + && KubernetesDeploymentMode.getDeploymentMode(spec) + == KubernetesDeploymentMode.STANDALONE) { + if (!effectiveConfig.contains( + StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS)) { + effectiveConfig.set( + StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS, + FlinkUtils.getNumTaskManagers(effectiveConfig, getParallelism())); + } } return this; } protected FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException { + KubernetesDeploymentMode deploymentMode = KubernetesDeploymentMode.getDeploymentMode(spec); + if (spec.getJob() != null) { + JobSpec jobSpec = spec.getJob(); effectiveConfig.set( DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName()); - final URI uri = new URI(spec.getJob().getJarURI()); + final URI uri = new URI(jobSpec.getJarURI()); effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString())); effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, getParallelism()); - if (spec.getJob().getAllowNonRestoredState() != null) { + if (jobSpec.getAllowNonRestoredState() != null) { effectiveConfig.set( SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, - spec.getJob().getAllowNonRestoredState()); + jobSpec.getAllowNonRestoredState()); + } + + if (jobSpec.getEntryClass() != null) { + effectiveConfig.set( + ApplicationConfiguration.APPLICATION_MAIN_CLASS, jobSpec.getEntryClass()); } } else { effectiveConfig.set( DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName()); } + + if (deploymentMode == KubernetesDeploymentMode.STANDALONE) { + effectiveConfig.set(DeploymentOptions.TARGET, "remote"); + effectiveConfig.set( + StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, + spec.getJob() == null ? SESSION : APPLICATION); + + if (spec.getJob() != null) { + effectiveConfig.set( + PipelineOptions.CLASSPATHS, + Collections.singletonList(getStandaloneJarURI(spec.getJob()))); + } + } return this; } + private String getStandaloneJarURI(JobSpec jobSpec) throws URISyntaxException { + URI uri = new URI(jobSpec.getJarURI()); + + // Running an application job through standalone mode doesn't requires file uri scheme and + // doesn't accept + // local scheme which is used for native so convert here to improve compatibilty at the + // operator layer + if (uri.getScheme().equals("local")) { + uri = + new URI( + "file", + uri.getAuthority() == null ? "" : uri.getAuthority(), + uri.getPath(), + uri.getQuery(), + uri.getFragment()); + } + + return uri.toASCIIString(); + } + private int getParallelism() { if (spec.getTaskManager() != null && spec.getTaskManager().getReplicas() != null) { if (spec.getJob().getParallelism() > 0) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java index 6c2c203d..b41d34b8 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java @@ -17,9 +17,11 @@ package org.apache.flink.kubernetes.operator.observer.deployment; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.observer.Observer; import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory; @@ -36,7 +38,8 @@ public class ObserverFactory { private final FlinkConfigManager configManager; private final StatusRecorder<FlinkDeploymentStatus> statusRecorder; private final EventRecorder eventRecorder; - private final Map<Mode, Observer<FlinkDeployment>> observerMap; + private final Map<Tuple2<Mode, KubernetesDeploymentMode>, Observer<FlinkDeployment>> + observerMap; public ObserverFactory( FlinkServiceFactory flinkServiceFactory, @@ -52,9 +55,11 @@ public class ObserverFactory { public Observer<FlinkDeployment> getOrCreate(FlinkDeployment flinkApp) { return observerMap.computeIfAbsent( - Mode.getMode(flinkApp), - mode -> { - switch (mode) { + Tuple2.of( + Mode.getMode(flinkApp), + KubernetesDeploymentMode.getDeploymentMode(flinkApp)), + modes -> { + switch (modes.f0) { case SESSION: return new SessionObserver( flinkServiceFactory.getOrCreate(flinkApp), @@ -68,7 +73,7 @@ public class ObserverFactory { eventRecorder); default: throw new UnsupportedOperationException( - String.format("Unsupported running mode: %s", mode)); + String.format("Unsupported running mode: %s", modes.f0)); } }); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index c03ddc98..ec0389e2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -33,7 +33,6 @@ import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.IngressUtils; -import org.apache.flink.kubernetes.operator.utils.SavepointUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import org.apache.flink.runtime.highavailability.JobResultStoreOptions; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; @@ -194,7 +193,7 @@ public class ApplicationReconciler @Override public boolean reconcileOtherChanges( FlinkDeployment deployment, Context ctx, Configuration observeConfig) throws Exception { - if (SavepointUtils.triggerSavepointIfNeeded(flinkService, deployment, observeConfig)) { + if (super.reconcileOtherChanges(deployment, ctx, observeConfig)) { return true; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java index 419e3e41..2d11f495 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java @@ -17,9 +17,11 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.reconciler.Reconciler; import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory; @@ -39,7 +41,8 @@ public class ReconcilerFactory { private final FlinkConfigManager configManager; private final EventRecorder eventRecorder; private final StatusRecorder<FlinkDeploymentStatus> deploymentStatusRecorder; - private final Map<Mode, Reconciler<FlinkDeployment>> reconcilerMap; + private final Map<Tuple2<Mode, KubernetesDeploymentMode>, Reconciler<FlinkDeployment>> + reconcilerMap; public ReconcilerFactory( KubernetesClient kubernetesClient, @@ -57,9 +60,11 @@ public class ReconcilerFactory { public Reconciler<FlinkDeployment> getOrCreate(FlinkDeployment flinkApp) { return reconcilerMap.computeIfAbsent( - Mode.getMode(flinkApp), - mode -> { - switch (mode) { + Tuple2.of( + Mode.getMode(flinkApp), + KubernetesDeploymentMode.getDeploymentMode(flinkApp)), + modes -> { + switch (modes.f0) { case SESSION: return new SessionReconciler( kubernetesClient, @@ -76,7 +81,7 @@ public class ReconcilerFactory { deploymentStatusRecorder); default: throw new UnsupportedOperationException( - String.format("Unsupported running mode: %s", mode)); + String.format("Unsupported running mode: %s", modes.f0)); } }); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index afb7bf27..c4b32529 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -142,11 +142,33 @@ public abstract class AbstractFlinkService implements FlinkService { protected abstract PodList getJmPodList(String namespace, String clusterId); + protected abstract void deployApplicationCluster(JobSpec jobSpec, Configuration conf) + throws Exception; + @Override public KubernetesClient getKubernetesClient() { return kubernetesClient; } + @Override + public void submitApplicationCluster( + JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception { + LOG.info( + "Deploying application cluster{}", + requireHaMetadata ? " requiring last-state from HA metadata" : ""); + if (FlinkUtils.isKubernetesHAActivated(conf)) { + final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID); + final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE); + // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g. + // parallelism) could take effect + FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient); + } + if (requireHaMetadata) { + validateHaMetadataExists(conf); + } + deployApplicationCluster(jobSpec, conf); + } + @Override public boolean isHaMetadataAvailable(Configuration conf) { return FlinkUtils.isHaMetadataAvailable(conf, kubernetesClient); @@ -219,8 +241,11 @@ public abstract class AbstractFlinkService implements FlinkService { } } - @Override - public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf) + protected void cancelJob( + FlinkDeployment deployment, + UpgradeMode upgradeMode, + Configuration conf, + boolean deleteClusterAfterSavepoint) throws Exception { var deploymentStatus = deployment.getStatus(); var jobIdString = deploymentStatus.getJobStatus().getJobId(); @@ -290,6 +315,9 @@ public abstract class AbstractFlinkService implements FlinkService { ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()), exception); } + if (deleteClusterAfterSavepoint) { + deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true); + } break; case LAST_STATE: deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, false); @@ -697,7 +725,6 @@ public abstract class AbstractFlinkService implements FlinkService { /** Wait until the FLink cluster has completely shut down. */ @VisibleForTesting void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) { - boolean jobManagerRunning = true; boolean serviceRunning = true; @@ -765,4 +792,14 @@ public abstract class AbstractFlinkService implements FlinkService { } return effectiveStatus; } + + private void validateHaMetadataExists(Configuration conf) { + if (!isHaMetadataAvailable(conf)) { + throw new DeploymentFailedException( + "HA metadata not available to restore from last state. " + + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. " + + "Manual restore required.", + "RestoreFailed"); + } + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java index 761f1e68..9fce6eec 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java @@ -22,6 +22,8 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode; import io.fabric8.kubernetes.client.KubernetesClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -33,6 +35,8 @@ public class FlinkServiceFactory { private final FlinkConfigManager configManager; private final Map<KubernetesDeploymentMode, FlinkService> serviceMap; + private static final Logger LOG = LoggerFactory.getLogger(FlinkServiceFactory.class); + public FlinkServiceFactory( KubernetesClient kubernetesClient, FlinkConfigManager configManager) { this.kubernetesClient = kubernetesClient; @@ -46,8 +50,10 @@ public class FlinkServiceFactory { mode -> { switch (mode) { case NATIVE: + LOG.info("Using NativeFlinkService"); return new NativeFlinkService(kubernetesClient, configManager); case STANDALONE: + LOG.info("Using StandaloneFlinkService"); return new StandaloneFlinkService(kubernetesClient, configManager); default: throw new UnsupportedOperationException( @@ -57,6 +63,7 @@ public class FlinkServiceFactory { } public FlinkService getOrCreate(FlinkDeployment deployment) { + LOG.info("Getting service for {}", deployment.getMetadata().getName()); return getOrCreate(getDeploymentMode(deployment)); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java index 60b5ee3b..47d3fc01 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java @@ -26,13 +26,12 @@ import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; +import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; -import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; -import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.utils.KubernetesUtils; import io.fabric8.kubernetes.api.model.ObjectMeta; @@ -56,21 +55,8 @@ public class NativeFlinkService extends AbstractFlinkService { } @Override - public void submitApplicationCluster( - JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception { - LOG.info( - "Deploying application cluster{}", - requireHaMetadata ? " requiring last-state from HA metadata" : ""); - if (FlinkUtils.isKubernetesHAActivated(conf)) { - final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID); - final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE); - // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g. - // parallelism) could take effect - FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient); - } - if (requireHaMetadata) { - validateHaMetadataExists(conf); - } + protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception { + LOG.info("Deploying application cluster"); final ClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader(); final ApplicationDeployer deployer = @@ -85,16 +71,6 @@ public class NativeFlinkService extends AbstractFlinkService { LOG.info("Application cluster successfully deployed"); } - private void validateHaMetadataExists(Configuration conf) { - if (!isHaMetadataAvailable(conf)) { - throw new DeploymentFailedException( - "HA metadata not available to restore from last state. " - + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. " - + "Manual restore required.", - "RestoreFailed"); - } - } - @Override public void submitSessionCluster(Configuration conf) throws Exception { LOG.info("Deploying session cluster"); @@ -110,6 +86,13 @@ public class NativeFlinkService extends AbstractFlinkService { LOG.info("Session cluster successfully deployed"); } + @Override + public void cancelJob( + FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration) + throws Exception { + cancelJob(deployment, upgradeMode, configuration, false); + } + @Override public void deleteClusterDeployment( ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java index 5a0b85eb..08c2ef4e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java @@ -20,11 +20,15 @@ package org.apache.flink.kubernetes.operator.service; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.client.deployment.ClusterDeploymentException; import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.KubernetesClusterClientFactory; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.config.Mode; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; +import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient; import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient; @@ -58,22 +62,25 @@ public class StandaloneFlinkService extends AbstractFlinkService { } @Override - public void submitApplicationCluster( - JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception { + protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception { LOG.info("Deploying application cluster"); - // TODO some HA stuff? - submitClusterInternal(conf); + submitClusterInternal(conf, Mode.APPLICATION); LOG.info("Application cluster successfully deployed"); } @Override public void submitSessionCluster(Configuration conf) throws Exception { LOG.info("Deploying session cluster"); - // TODO some HA stuff? - submitClusterInternal(conf); + submitClusterInternal(conf, Mode.SESSION); LOG.info("Session cluster successfully deployed"); } + @Override + public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf) + throws Exception { + cancelJob(deployment, upgradeMode, conf, true); + } + @Override public void deleteClusterDeployment( ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) { @@ -106,13 +113,26 @@ public class StandaloneFlinkService extends AbstractFlinkService { executorService); } - private void submitClusterInternal(Configuration conf) throws ClusterDeploymentException { + private void submitClusterInternal(Configuration conf, Mode mode) + throws ClusterDeploymentException { final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE); FlinkStandaloneKubeClient client = createNamespacedKubeClient(conf, namespace); try (final KubernetesStandaloneClusterDescriptor kubernetesClusterDescriptor = new KubernetesStandaloneClusterDescriptor(conf, client)) { - kubernetesClusterDescriptor.deploySessionCluster(getClusterSpecification(conf)); + switch (mode) { + case APPLICATION: + kubernetesClusterDescriptor.deployApplicationCluster( + getClusterSpecification(conf), + ApplicationConfiguration.fromConfiguration(conf)); + break; + case SESSION: + kubernetesClusterDescriptor.deploySessionCluster(getClusterSpecification(conf)); + break; + default: + throw new UnsupportedOperationException( + String.format("Unsupported running mode: %s", mode)); + } } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java index 62bea6dd..db238f28 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java @@ -45,11 +45,7 @@ public class EventSourceUtils { public static InformerEventSource<Deployment, FlinkDeployment> getDeploymentInformerEventSource( EventSourceContext<FlinkDeployment> context) { final String labelSelector = - Map.of( - Constants.LABEL_TYPE_KEY, - Constants.LABEL_TYPE_NATIVE_TYPE, - Constants.LABEL_COMPONENT_KEY, - Constants.LABEL_COMPONENT_JOB_MANAGER) + Map.of(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER) .entrySet().stream() .map(Object::toString) .collect(Collectors.joining(",")); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index 05cb7e03..d2f4c359 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -167,6 +167,10 @@ public class FlinkUtils { public static int getNumTaskManagers(Configuration conf) { int parallelism = conf.get(CoreOptions.DEFAULT_PARALLELISM); + return getNumTaskManagers(conf, parallelism); + } + + public static int getNumTaskManagers(Configuration conf, int parallelism) { int taskSlots = conf.get(TaskManagerOptions.NUM_TASK_SLOTS); return (parallelism + taskSlots - 1) / taskSlots; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java index de3c7b68..9e92dc93 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java @@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobState; +import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.crd.spec.Resource; import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; @@ -88,7 +89,11 @@ public class DefaultValidator implements FlinkResourceValidator { deployment.getMetadata().getName(), deployment.getMetadata().getNamespace()), validateLogConfig(spec.getLogConfiguration()), - validateJobSpec(spec.getJob(), spec.getTaskManager(), effectiveConfig), + validateJobSpec( + spec.getJob(), + spec.getTaskManager(), + effectiveConfig, + KubernetesDeploymentMode.getDeploymentMode(deployment)), validateJmSpec(spec.getJobManager(), effectiveConfig), validateTmSpec(spec.getTaskManager()), validateSpecChange(deployment, effectiveConfig), @@ -173,7 +178,10 @@ public class DefaultValidator implements FlinkResourceValidator { } private Optional<String> validateJobSpec( - JobSpec job, @Nullable TaskManagerSpec tm, Map<String, String> confMap) { + JobSpec job, + @Nullable TaskManagerSpec tm, + Map<String, String> confMap, + KubernetesDeploymentMode mode) { if (job == null) { return Optional.empty(); } @@ -306,6 +314,24 @@ public class DefaultValidator implements FlinkResourceValidator { return Optional.of("Cannot switch from job to session cluster"); } + KubernetesDeploymentMode oldDeploymentMode = + oldSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : oldSpec.getMode(); + + KubernetesDeploymentMode newDeploymentMode = + newSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : newSpec.getMode(); + + if (oldDeploymentMode == KubernetesDeploymentMode.NATIVE + && newDeploymentMode != KubernetesDeploymentMode.NATIVE) { + return Optional.of( + "Cannot switch from native kubernetes to standalone kubernetes cluster"); + } + + if (oldDeploymentMode == KubernetesDeploymentMode.STANDALONE + && newDeploymentMode != KubernetesDeploymentMode.STANDALONE) { + return Optional.of( + "Cannot switch from standalone kubernetes to native kubernetes cluster"); + } + JobSpec oldJob = oldSpec.getJob(); JobSpec newJob = newSpec.getJob(); if (oldJob != null && newJob != null) { @@ -358,7 +384,11 @@ public class DefaultValidator implements FlinkResourceValidator { return firstPresent( validateNotApplicationCluster(sessionCluster), validateSessionClusterId(sessionJob, sessionCluster), - validateJobSpec(sessionJob.getSpec().getJob(), null, effectiveConfig)); + validateJobSpec( + sessionJob.getSpec().getJob(), + null, + effectiveConfig, + KubernetesDeploymentMode.getDeploymentMode(sessionCluster))); } private Optional<String> validateJobNotEmpty(FlinkSessionJob sessionJob) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 2e99c53f..4ff0a33f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec; import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; +import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.Savepoint; @@ -156,6 +157,10 @@ public class TestingFlinkService extends AbstractFlinkService { if (requireHaMetadata) { validateHaMetadataExists(conf); } + deployApplicationCluster(jobSpec, conf); + } + + protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception { if (deployFailure) { throw new Exception("Deployment failure"); } @@ -358,6 +363,13 @@ public class TestingFlinkService extends AbstractFlinkService { 0); } + @Override + public void cancelJob( + FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration) + throws Exception { + cancelJob(deployment, upgradeMode, configuration, false); + } + private String cancelJob(FlinkVersion flinkVersion, JobID jobID, boolean savepoint) throws Exception { Optional<Tuple2<String, JobStatusMessage>> jobOpt = diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java index df5bf415..f33a9986 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java @@ -17,6 +17,8 @@ package org.apache.flink.kubernetes.operator.config; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; @@ -33,9 +35,11 @@ import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec; +import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; @@ -54,6 +58,7 @@ import java.net.URISyntaxException; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Map; import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE; @@ -62,6 +67,8 @@ import static org.apache.flink.kubernetes.operator.TestUtils.SAMPLE_JAR; import static org.apache.flink.kubernetes.operator.TestUtils.SERVICE_ACCOUNT; import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.DEFAULT_CHECKPOINTING_INTERVAL; import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; /** FlinkConfigBuilderTest. */ public class FlinkConfigBuilderTest { @@ -347,6 +354,84 @@ public class FlinkConfigBuilderTest { configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)); } + @Test + public void testApplyStandaloneApplicationSpec() throws URISyntaxException, IOException { + FlinkDeployment dep = ReconciliationUtils.clone(flinkDeployment); + final String entryClass = "entry.class"; + final String jarUri = "local:///flink/opt/StateMachine.jar"; + final String correctedJarUri = "file:///flink/opt/StateMachine.jar"; + dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE); + dep.getSpec().getJob().setEntryClass(entryClass); + dep.getSpec().getJob().setJarURI(jarUri); + dep.getSpec().setTaskManager(new TaskManagerSpec()); + dep.getSpec().getTaskManager().setReplicas(3); + dep.getSpec().getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2"); + + Configuration configuration = + new FlinkConfigBuilder(dep, new Configuration()) + .applyFlinkConfiguration() + .applyTaskManagerSpec() + .applyJobOrSessionSpec() + .build(); + + Assertions.assertEquals("remote", configuration.getString(DeploymentOptions.TARGET)); + Assertions.assertEquals( + StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION, + configuration.get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE)); + Assertions.assertEquals(6, configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)); + Assertions.assertEquals( + entryClass, + configuration.getString(ApplicationConfiguration.APPLICATION_MAIN_CLASS)); + Assertions.assertEquals( + 3, + configuration.get( + StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS)); + List<String> classpaths = + ConfigUtils.decodeListFromConfig( + configuration, PipelineOptions.CLASSPATHS, String::toString); + assertThat(classpaths, containsInAnyOrder(correctedJarUri)); + + dep.getSpec().getTaskManager().setReplicas(null); + dep.getSpec().getJob().setParallelism(10); + + configuration = + new FlinkConfigBuilder(dep, new Configuration()) + .applyFlinkConfiguration() + .applyTaskManagerSpec() + .applyJobOrSessionSpec() + .build(); + Assertions.assertEquals( + 5, + configuration.get( + StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS)); + } + + @Test + public void testApplyStandaloneSessionSpec() throws URISyntaxException, IOException { + FlinkDeployment dep = ReconciliationUtils.clone(flinkDeployment); + dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE); + dep.getSpec().setJob(null); + dep.getSpec().setTaskManager(new TaskManagerSpec()); + dep.getSpec().getTaskManager().setReplicas(5); + dep.getSpec().getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2"); + + Configuration configuration = + new FlinkConfigBuilder(dep, new Configuration()) + .applyFlinkConfiguration() + .applyTaskManagerSpec() + .applyJobOrSessionSpec() + .build(); + + Assertions.assertEquals("remote", configuration.getString(DeploymentOptions.TARGET)); + Assertions.assertEquals( + StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION, + configuration.get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE)); + Assertions.assertEquals( + 5, + configuration.get( + StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS)); + } + @Test public void testBuildFrom() throws Exception { final Configuration configuration = diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java index c169e1d9..088d46bf 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -70,6 +70,7 @@ public class TestingFlinkDeploymentController KubernetesClient kubernetesClient, TestingFlinkService flinkService) { FlinkServiceFactory flinkServiceFactory = new TestingFlinkServiceFactory(flinkService); + eventRecorder = new EventRecorder(kubernetesClient, eventCollector); statusRecorder = new StatusRecorder<>( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java index f801bfbc..5dddb7b9 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java @@ -54,7 +54,6 @@ public class SessionJobObserverTest { private TestingFlinkService flinkService; private SessionJobObserver observer; private SessionJobReconciler reconciler; - private FlinkServiceFactory flinkServiceFactory; @BeforeEach public void before() { @@ -62,7 +61,7 @@ public class SessionJobObserverTest { var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {}); var statusRecorder = new TestingStatusRecorder<FlinkSessionJobStatus>(); flinkService = new TestingFlinkService(); - flinkServiceFactory = flinkServiceFactory = new TestingFlinkServiceFactory(flinkService); + FlinkServiceFactory flinkServiceFactory = new TestingFlinkServiceFactory(flinkService); observer = new SessionJobObserver( flinkServiceFactory, configManager, statusRecorder, eventRecorder); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java index 998fa34d..d6f342c9 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java @@ -72,7 +72,7 @@ public class SessionJobReconcilerTest { @BeforeEach public void before() { flinkService = new TestingFlinkService(); - flinkServiceFactory = flinkServiceFactory = new TestingFlinkServiceFactory(flinkService); + flinkServiceFactory = new TestingFlinkServiceFactory(flinkService); eventRecorder = new EventRecorder(null, (r, e) -> {}) { @Override diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java index 5189ab47..23e75bc1 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java @@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobState; +import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentReconciliationStatus; @@ -301,6 +302,74 @@ public class DefaultValidatorTest { }, "Cannot switch from session to job cluster"); + testError( + dep -> { + dep.setStatus(new FlinkDeploymentStatus()); + dep.getStatus().setJobStatus(new JobStatus()); + + dep.getStatus() + .setReconciliationStatus(new FlinkDeploymentReconciliationStatus()); + dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE); + FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec()); + + spec.setMode(KubernetesDeploymentMode.NATIVE); + dep.getStatus() + .getReconciliationStatus() + .serializeAndSetLastReconciledSpec(spec, dep); + }, + "Cannot switch from native kubernetes to standalone kubernetes cluster"); + + testError( + dep -> { + dep.setStatus(new FlinkDeploymentStatus()); + dep.getStatus().setJobStatus(new JobStatus()); + + dep.getStatus() + .setReconciliationStatus(new FlinkDeploymentReconciliationStatus()); + dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE); + FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec()); + + spec.setMode(null); + dep.getStatus() + .getReconciliationStatus() + .serializeAndSetLastReconciledSpec(spec, dep); + }, + "Cannot switch from native kubernetes to standalone kubernetes cluster"); + + testError( + dep -> { + dep.setStatus(new FlinkDeploymentStatus()); + dep.getStatus().setJobStatus(new JobStatus()); + + dep.getStatus() + .setReconciliationStatus(new FlinkDeploymentReconciliationStatus()); + dep.getSpec().setMode(null); + FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec()); + + spec.setMode(KubernetesDeploymentMode.STANDALONE); + dep.getStatus() + .getReconciliationStatus() + .serializeAndSetLastReconciledSpec(spec, dep); + }, + "Cannot switch from standalone kubernetes to native kubernetes cluster"); + + testError( + dep -> { + dep.setStatus(new FlinkDeploymentStatus()); + dep.getStatus().setJobStatus(new JobStatus()); + + dep.getStatus() + .setReconciliationStatus(new FlinkDeploymentReconciliationStatus()); + dep.getSpec().setMode(KubernetesDeploymentMode.NATIVE); + FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec()); + + spec.setMode(KubernetesDeploymentMode.STANDALONE); + dep.getStatus() + .getReconciliationStatus() + .serializeAndSetLastReconciledSpec(spec, dep); + }, + "Cannot switch from standalone kubernetes to native kubernetes cluster"); + // Test upgrade mode change validation testError( dep -> {
