This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new ae2f75a [FLINK-26892] Observe current status before validating CR
changes
ae2f75a is described below
commit ae2f75aea2be379190969e990640dfbb049fc346
Author: Biao Geng <[email protected]>
AuthorDate: Sat Apr 2 01:35:33 2022 +0800
[FLINK-26892] Observe current status before validating CR changes
---
.../flink/kubernetes/operator/FlinkOperator.java | 4 +-
.../flink/kubernetes/operator/config/Mode.java | 22 +++++-
.../controller/FlinkDeploymentController.java | 31 ++++----
.../kubernetes/operator/observer/BaseObserver.java | 38 ++++++++-
.../kubernetes/operator/observer/JobObserver.java | 32 ++++----
.../kubernetes/operator/observer/Observer.java | 4 +-
.../operator/observer/ObserverFactory.java | 15 ++--
.../operator/observer/SessionObserver.java | 31 ++++----
.../operator/utils/FlinkConfigBuilder.java | 24 ++++--
.../kubernetes/operator/utils/FlinkUtils.java | 9 ++-
.../kubernetes/operator/TestingFlinkService.java | 7 ++
.../controller/FlinkDeploymentControllerTest.java | 91 +++++++++++++++++++++-
.../operator/observer/JobObserverTest.java | 50 ++++++------
.../operator/observer/SessionObserverTest.java | 38 ++++-----
14 files changed, 283 insertions(+), 113 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 2ceeafb..02757a6 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -76,7 +76,9 @@ public class FlinkOperator {
FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
ReconcilerFactory reconcilerFactory =
new ReconcilerFactory(client, flinkService,
operatorConfiguration);
- ObserverFactory observerFactory = new ObserverFactory(flinkService,
operatorConfiguration);
+ ObserverFactory observerFactory =
+ new ObserverFactory(
+ flinkService, operatorConfiguration,
defaultConfig.getFlinkConfig());
controller =
new FlinkDeploymentController(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
index e616432..1e62b34 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
@@ -19,13 +19,33 @@
package org.apache.flink.kubernetes.operator.config;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
/** The mode of {@link FlinkDeployment}. */
public enum Mode {
APPLICATION,
SESSION;
+ /**
+ * Return the mode of the given FlinkDeployment for Observer and
Reconciler. Note, switching
+ * mode for an existing deployment is not allowed.
+ *
+ * @param flinkApp given FlinkDeployment
+ * @return Mode
+ */
public static Mode getMode(FlinkDeployment flinkApp) {
- return flinkApp.getSpec().getJob() != null ? APPLICATION : SESSION;
+ // Try to use lastReconciledSpec if it exists.
+ // The mode derived from last-reconciled spec or current spec should
be same.
+ // If they are different, observation phase will use last-reconciled
spec and validation
+ // phase will fail.
+ FlinkDeploymentSpec lastReconciledSpec =
+
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+ return lastReconciledSpec == null
+ ? getMode(flinkApp.getSpec())
+ : getMode(lastReconciledSpec);
+ }
+
+ private static Mode getMode(FlinkDeploymentSpec spec) {
+ return spec.getJob() != null ? APPLICATION : SESSION;
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index f82b30f..0067674 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -92,34 +92,31 @@ public class FlinkDeploymentController
@Override
public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
LOG.info("Deleting FlinkDeployment");
- Configuration effectiveConfig =
- FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig());
try {
- observerFactory.getOrCreate(flinkApp).observe(flinkApp, context,
effectiveConfig);
+ observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
} catch (DeploymentFailedException dfe) {
// ignore during cleanup
}
+ Configuration effectiveConfig =
+ FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig());
return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp,
effectiveConfig);
}
@Override
public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp,
Context context) {
- FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
LOG.info("Starting reconciliation");
-
- Optional<String> validationError = validator.validate(flinkApp);
- if (validationError.isPresent()) {
- LOG.error("Validation failed: " + validationError.get());
- ReconciliationUtils.updateForReconciliationError(flinkApp,
validationError.get());
- return ReconciliationUtils.toUpdateControl(
- operatorConfiguration, originalCopy, flinkApp, false);
- }
-
- Configuration effectiveConfig =
- FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig());
-
+ FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
try {
- observerFactory.getOrCreate(flinkApp).observe(flinkApp, context,
effectiveConfig);
+ observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
+ Optional<String> validationError = validator.validate(flinkApp);
+ if (validationError.isPresent()) {
+ LOG.error("Validation failed: " + validationError.get());
+ ReconciliationUtils.updateForReconciliationError(flinkApp,
validationError.get());
+ return ReconciliationUtils.toUpdateControl(
+ operatorConfiguration, originalCopy, flinkApp, false);
+ }
+ Configuration effectiveConfig =
+ FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig());
reconcilerFactory.getOrCreate(flinkApp).reconcile(flinkApp,
context, effectiveConfig);
} catch (DeploymentFailedException dfe) {
handleDeploymentFailed(flinkApp, dfe);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
index 676074b..4afc227 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
@@ -28,6 +28,7 @@ import
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.ContainerStatus;
@@ -53,11 +54,15 @@ public abstract class BaseObserver implements Observer {
protected final FlinkService flinkService;
protected final FlinkOperatorConfiguration operatorConfiguration;
+ protected final Configuration flinkConfig;
public BaseObserver(
- FlinkService flinkService, FlinkOperatorConfiguration
operatorConfiguration) {
+ FlinkService flinkService,
+ FlinkOperatorConfiguration operatorConfiguration,
+ Configuration flinkConfig) {
this.flinkService = flinkService;
this.operatorConfiguration = operatorConfiguration;
+ this.flinkConfig = flinkConfig;
}
protected void observeJmDeployment(
@@ -182,4 +187,35 @@ public abstract class BaseObserver implements Observer {
&& lastReconciledSpec != null
&& lastReconciledSpec.getJob().getState() ==
JobState.SUSPENDED;
}
+
+ public void observe(FlinkDeployment flinkApp, Context context) {
+ FlinkDeploymentSpec lastReconciledSpec =
+
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+ // Nothing has been launched so skip observing
+ if (lastReconciledSpec == null) {
+ return;
+ }
+
+ Configuration lastValidatedConfig =
+ FlinkUtils.getEffectiveConfig(
+ flinkApp.getMetadata(), lastReconciledSpec,
this.flinkConfig);
+ if (!isClusterReady(flinkApp)) {
+ observeJmDeployment(flinkApp, context, lastValidatedConfig);
+ }
+ if (isClusterReady(flinkApp)) {
+ observeIfClusterReady(flinkApp, context, lastValidatedConfig);
+ }
+ clearErrorsIfJobManagerDeploymentNotInErrorStatus(flinkApp);
+ }
+
+ /**
+ * Observe the flinkApp status when the cluster is ready. It will be
implemented by child class
+ * to reflect the changed status on the flinkApp resource.
+ *
+ * @param flinkApp the target flinkDeployment resource
+ * @param context the context with which the operation is executed
+ * @param lastValidatedConfig the last validated config
+ */
+ public abstract void observeIfClusterReady(
+ FlinkDeployment flinkApp, Context context, Configuration
lastValidatedConfig);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
index 9ecab3e..1c60a87 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
@@ -41,38 +41,35 @@ import java.util.concurrent.TimeoutException;
public class JobObserver extends BaseObserver {
public JobObserver(
- FlinkService flinkService, FlinkOperatorConfiguration
operatorConfiguration) {
- super(flinkService, operatorConfiguration);
+ FlinkService flinkService,
+ FlinkOperatorConfiguration operatorConfiguration,
+ Configuration flinkConfig) {
+ super(flinkService, operatorConfiguration, flinkConfig);
}
@Override
- public void observe(FlinkDeployment flinkApp, Context context,
Configuration effectiveConfig) {
- if (!isClusterReady(flinkApp)) {
- observeJmDeployment(flinkApp, context, effectiveConfig);
+ public void observeIfClusterReady(
+ FlinkDeployment flinkApp, Context context, Configuration
lastValidatedConfig) {
+ boolean jobFound = observeFlinkJobStatus(flinkApp, context,
lastValidatedConfig);
+ if (jobFound) {
+ observeSavepointStatus(flinkApp, lastValidatedConfig);
}
- if (isClusterReady(flinkApp)) {
- boolean jobFound = observeFlinkJobStatus(flinkApp, context,
effectiveConfig);
- if (jobFound) {
- observeSavepointStatus(flinkApp, effectiveConfig);
- }
- }
- clearErrorsIfJobManagerDeploymentNotInErrorStatus(flinkApp);
}
private boolean observeFlinkJobStatus(
- FlinkDeployment flinkApp, Context context, Configuration
effectiveConfig) {
+ FlinkDeployment flinkApp, Context context, Configuration
lastValidatedConfig) {
logger.info("Observing job status");
FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
String previousJobStatus = flinkAppStatus.getJobStatus().getState();
Collection<JobStatusMessage> clusterJobStatuses;
try {
- clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+ clusterJobStatuses = flinkService.listJobs(lastValidatedConfig);
} catch (Exception e) {
logger.error("Exception while listing jobs", e);
flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
if (e instanceof TimeoutException) {
// check for problems with the underlying deployment
- observeJmDeployment(flinkApp, context, effectiveConfig);
+ observeJmDeployment(flinkApp, context, lastValidatedConfig);
}
return false;
}
@@ -111,7 +108,8 @@ public class JobObserver extends BaseObserver {
return status.getState();
}
- private void observeSavepointStatus(FlinkDeployment flinkApp,
Configuration effectiveConfig) {
+ private void observeSavepointStatus(
+ FlinkDeployment flinkApp, Configuration lastValidatedConfig) {
SavepointInfo savepointInfo =
flinkApp.getStatus().getJobStatus().getSavepointInfo();
if (!SavepointUtils.savepointInProgress(flinkApp)) {
logger.debug("Savepoint not in progress");
@@ -121,7 +119,7 @@ public class JobObserver extends BaseObserver {
SavepointFetchResult savepointFetchResult;
try {
- savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp,
effectiveConfig);
+ savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp,
lastValidatedConfig);
} catch (Exception e) {
logger.error("Exception while fetching savepoint info", e);
return;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index e354689..1f7e16e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -17,7 +17,6 @@
package org.apache.flink.kubernetes.operator.observer;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -30,7 +29,6 @@ public interface Observer {
*
* @param flinkApp the target flinkDeployment resource
* @param context the context with which the operation is executed
- * @param effectiveConfig the effective config of the flinkApp
*/
- void observe(FlinkDeployment flinkApp, Context context, Configuration
effectiveConfig);
+ void observe(FlinkDeployment flinkApp, Context context);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
index d3ec8b9..a0a3566 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.observer;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -30,13 +31,17 @@ import java.util.concurrent.ConcurrentHashMap;
public class ObserverFactory {
private final FlinkService flinkService;
- private final FlinkOperatorConfiguration operatorConfiguration;
+ private final FlinkOperatorConfiguration operatorConfig;
+ private final Configuration flinkConfig;
private final Map<Mode, Observer> observerMap;
public ObserverFactory(
- FlinkService flinkService, FlinkOperatorConfiguration
operatorConfiguration) {
+ FlinkService flinkService,
+ FlinkOperatorConfiguration operatorConfiguration,
+ Configuration flinkConfig) {
this.flinkService = flinkService;
- this.operatorConfiguration = operatorConfiguration;
+ this.operatorConfig = operatorConfiguration;
+ this.flinkConfig = flinkConfig;
this.observerMap = new ConcurrentHashMap<>();
}
@@ -46,9 +51,9 @@ public class ObserverFactory {
mode -> {
switch (mode) {
case SESSION:
- return new SessionObserver(flinkService,
operatorConfiguration);
+ return new SessionObserver(flinkService,
operatorConfig, flinkConfig);
case APPLICATION:
- return new JobObserver(flinkService,
operatorConfiguration);
+ return new JobObserver(flinkService,
operatorConfig, flinkConfig);
default:
throw new UnsupportedOperationException(
String.format("Unsupported running mode:
%s", mode));
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
index 80a4fbf..f74af3b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
@@ -31,27 +31,24 @@ import java.util.concurrent.TimeoutException;
public class SessionObserver extends BaseObserver {
public SessionObserver(
- FlinkService flinkService, FlinkOperatorConfiguration
operatorConfiguration) {
- super(flinkService, operatorConfiguration);
+ FlinkService flinkService,
+ FlinkOperatorConfiguration operatorConfiguration,
+ Configuration flinkConfig) {
+ super(flinkService, operatorConfiguration, flinkConfig);
}
@Override
- public void observe(FlinkDeployment flinkApp, Context context,
Configuration effectiveConfig) {
- if (!isClusterReady(flinkApp)) {
- observeJmDeployment(flinkApp, context, effectiveConfig);
- }
- if (isClusterReady(flinkApp)) {
- // Check if session cluster can serve rest calls following our
practice in JobObserver
- try {
- flinkService.listJobs(effectiveConfig);
- } catch (Exception e) {
- logger.error("REST service in session cluster is bad now", e);
- if (e instanceof TimeoutException) {
- // check for problems with the underlying deployment
- observeJmDeployment(flinkApp, context, effectiveConfig);
- }
+ public void observeIfClusterReady(
+ FlinkDeployment flinkApp, Context context, Configuration
lastValidatedConfig) {
+ // Check if session cluster can serve rest calls following our
practice in JobObserver
+ try {
+ flinkService.listJobs(lastValidatedConfig);
+ } catch (Exception e) {
+ logger.error("REST service in session cluster is bad now", e);
+ if (e instanceof TimeoutException) {
+ // check for problems with the underlying deployment
+ observeJmDeployment(flinkApp, context, lastValidatedConfig);
}
}
- clearErrorsIfJobManagerDeploymentNotInErrorStatus(flinkApp);
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index 2faa76e..cc59838 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -32,6 +32,7 @@ import
org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.util.StringUtils;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.internal.SerializationUtils;
@@ -50,13 +51,18 @@ import static
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NA
/** Builder to get effective flink config from {@link FlinkDeployment}. */
public class FlinkConfigBuilder {
- private final FlinkDeployment deploy;
+ private final ObjectMeta meta;
private final FlinkDeploymentSpec spec;
private final Configuration effectiveConfig;
public FlinkConfigBuilder(FlinkDeployment deploy, Configuration
flinkConfig) {
- this.deploy = deploy;
- this.spec = this.deploy.getSpec();
+ this(deploy.getMetadata(), deploy.getSpec(), flinkConfig);
+ }
+
+ public FlinkConfigBuilder(
+ ObjectMeta metadata, FlinkDeploymentSpec spec, Configuration
flinkConfig) {
+ this.meta = metadata;
+ this.spec = spec;
this.effectiveConfig = new Configuration(flinkConfig);
}
@@ -179,8 +185,8 @@ public class FlinkConfigBuilder {
public Configuration build() {
// Set cluster config
- final String namespace = deploy.getMetadata().getNamespace();
- final String clusterId = deploy.getMetadata().getName();
+ final String namespace = meta.getNamespace();
+ final String clusterId = meta.getName();
effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE,
namespace);
effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID,
clusterId);
return effectiveConfig;
@@ -188,7 +194,13 @@ public class FlinkConfigBuilder {
public static Configuration buildFrom(FlinkDeployment dep, Configuration
flinkConfig)
throws IOException, URISyntaxException {
- return new FlinkConfigBuilder(dep, flinkConfig)
+ return buildFrom(dep.getMetadata(), dep.getSpec(), flinkConfig);
+ }
+
+ public static Configuration buildFrom(
+ ObjectMeta meta, FlinkDeploymentSpec spec, Configuration
flinkConfig)
+ throws IOException, URISyntaxException {
+ return new FlinkConfigBuilder(meta, spec, flinkConfig)
.applyFlinkConfiguration()
.applyLogConfiguration()
.applyImage()
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 8068802..1d89036 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -25,6 +25,7 @@ import
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.operator.config.DefaultConfig;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
@@ -33,6 +34,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.api.model.ConfigMapList;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
@@ -61,9 +63,14 @@ public class FlinkUtils {
public static Configuration getEffectiveConfig(
FlinkDeployment flinkApp, Configuration flinkConfig) {
+ return getEffectiveConfig(flinkApp.getMetadata(), flinkApp.getSpec(),
flinkConfig);
+ }
+
+ public static Configuration getEffectiveConfig(
+ ObjectMeta meta, FlinkDeploymentSpec spec, Configuration
flinkConfig) {
try {
final Configuration effectiveConfig =
- FlinkConfigBuilder.buildFrom(flinkApp, flinkConfig);
+ FlinkConfigBuilder.buildFrom(meta, spec, flinkConfig);
LOG.debug("Effective config: {}", effectiveConfig);
return effectiveConfig;
} catch (Exception e) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index d6d6d6d..214bd03 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
/** Flink service mock for tests. */
@@ -50,6 +51,7 @@ public class TestingFlinkService extends FlinkService {
private Set<String> sessions = new HashSet<>();
private boolean isPortReady = true;
private PodList podList = new PodList();
+ private Consumer<Configuration> listJobConsumer = conf -> {};
public TestingFlinkService() {
super(null, null);
@@ -80,6 +82,7 @@ public class TestingFlinkService extends FlinkService {
@Override
public List<JobStatusMessage> listJobs(Configuration conf) throws
Exception {
+ listJobConsumer.accept(conf);
if (jobs.isEmpty() && !sessions.isEmpty()) {
throw new Exception("Trying to list a job without submitting it");
}
@@ -89,6 +92,10 @@ public class TestingFlinkService extends FlinkService {
return jobs.stream().map(t -> t.f1).collect(Collectors.toList());
}
+ public void setListJobConsumer(Consumer<Configuration> listJobConsumer) {
+ this.listJobConsumer = listJobConsumer;
+ }
+
public List<Tuple2<String, JobStatusMessage>> listJobs() {
return new ArrayList<>(jobs);
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 71348c7..3b93713 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -20,8 +20,10 @@ package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
@@ -29,6 +31,7 @@ import
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.observer.BaseObserver;
import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
@@ -72,6 +75,7 @@ public class FlinkDeploymentControllerTest {
FlinkOperatorConfiguration.fromConfiguration(new Configuration());
private TestingFlinkService flinkService;
+ private DefaultConfig defaultConfig;
private FlinkDeploymentController testController;
private KubernetesMockServer mockServer;
@@ -80,6 +84,7 @@ public class FlinkDeploymentControllerTest {
@BeforeEach
public void setup() {
flinkService = new TestingFlinkService();
+ defaultConfig = FlinkUtils.loadDefaultConfig();
testController = createTestController(kubernetesClient, flinkService);
}
@@ -397,7 +402,84 @@ public class FlinkDeploymentControllerTest {
testUpgradeNotReadyCluster(appCluster, false);
}
- public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean
allowUpgrade) {
+ @Test
+ public void verifyReconcileWithBadConfig() {
+
+ FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+ UpdateControl<FlinkDeployment> updateControl;
+ // Override rest port, and it should be saved in lastReconciledSpec
once a successful
+ // reconcile() finishes.
+
appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(),
"8088");
+ updateControl = testController.reconcile(appCluster, context);
+ assertTrue(updateControl.isUpdateStatus());
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+
+ // Check when the bad config is applied, observe() will change the
cluster state correctly
+ appCluster.getSpec().getJobManager().setReplicas(-1);
+ // Next reconcile will set error msg and observe with previous
validated config
+ updateControl = testController.reconcile(appCluster, context);
+ assertEquals(
+ "JobManager replicas should not be configured less than one.",
+ appCluster.getStatus().getReconciliationStatus().getError());
+ assertTrue(updateControl.isUpdateStatus());
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+
+ // Make sure we do validation before getting effective config in
reconcile().
+ appCluster.getSpec().getJobManager().setReplicas(1);
+ appCluster.getSpec().getJob().setJarURI(null);
+ // Verify the saved rest port in lastReconciledSpec is actually used
in observe() by
+ // utilizing listJobConsumer
+
appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(),
"12345");
+ flinkService.setListJobConsumer(
+ (configuration) -> assertEquals(8088,
configuration.get(RestOptions.PORT)));
+ testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ }
+
+ @Test
+ public void verifyReconcileWithAChangedOperatorMode() {
+
+ FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+ UpdateControl<FlinkDeployment> updateControl;
+
+ updateControl = testController.reconcile(appCluster, context);
+ assertTrue(updateControl.isUpdateStatus());
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+
+ updateControl = testController.reconcile(appCluster, context);
+ JobStatus jobStatus = appCluster.getStatus().getJobStatus();
+ assertTrue(updateControl.isUpdateStatus());
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ // jobStatus has not been set at this time
+ assertEquals(BaseObserver.JOB_STATE_UNKNOWN, jobStatus.getState());
+
+ // Switches operator mode to SESSION
+ appCluster.getSpec().setJob(null);
+ // Validation fails and JobObserver should still be used
+ updateControl = testController.reconcile(appCluster, context);
+ assertTrue(updateControl.isUpdateStatus());
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ // Verify jobStatus is running
+ jobStatus = appCluster.getStatus().getJobStatus();
+ JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
+ assertEquals(expectedJobStatus.getJobId().toHexString(),
jobStatus.getJobId());
+ assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
+ assertEquals(expectedJobStatus.getJobState().toString(),
jobStatus.getState());
+ }
+
+ private void testUpgradeNotReadyCluster(FlinkDeployment appCluster,
boolean allowUpgrade) {
testController.reconcile(appCluster, TestUtils.createEmptyContext());
assertEquals(
appCluster.getSpec(),
@@ -522,14 +604,17 @@ public class FlinkDeploymentControllerTest {
FlinkDeploymentController controller =
new FlinkDeploymentController(
- FlinkUtils.loadDefaultConfig(),
+ defaultConfig,
operatorConfiguration,
kubernetesClient,
"test",
new DefaultDeploymentValidator(),
new ReconcilerFactory(
kubernetesClient, flinkService,
operatorConfiguration),
- new ObserverFactory(flinkService,
operatorConfiguration));
+ new ObserverFactory(
+ flinkService,
+ operatorConfiguration,
+ defaultConfig.getFlinkConfig()));
controller.setControllerConfig(
new FlinkControllerConfig(controller, Collections.emptySet()));
return controller;
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
index b7aed87..8ea1063 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
@@ -44,15 +44,17 @@ public class JobObserverTest {
@Test
public void observeApplicationCluster() {
+ Configuration flinkConf = new Configuration();
TestingFlinkService flinkService = new TestingFlinkService();
JobObserver observer =
new JobObserver(
flinkService,
- FlinkOperatorConfiguration.fromConfiguration(new
Configuration()));
+ FlinkOperatorConfiguration.fromConfiguration(new
Configuration()),
+ flinkConf);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
- Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
+ Configuration conf = FlinkUtils.getEffectiveConfig(deployment,
flinkConf);
- observer.observe(deployment, TestUtils.createEmptyContext(), conf);
+ observer.observe(deployment, TestUtils.createEmptyContext());
deployment.setStatus(new FlinkDeploymentStatus());
deployment
@@ -66,31 +68,31 @@ public class JobObserverTest {
flinkService.setPortReady(false);
// Port not ready
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
flinkService.setPortReady(true);
// Port ready but we have to recheck once again
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
deployment.getStatus().getJobManagerDeploymentStatus());
// Stable ready
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertEquals(JobState.RUNNING.name(),
deployment.getStatus().getJobStatus().getState());
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
@@ -110,24 +112,24 @@ public class JobObserverTest {
>= 0);
// Test job manager is unavailable suddenly
flinkService.setPortReady(false);
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
// Job manager recovers
flinkService.setPortReady(true);
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
deployment.getStatus().getJobManagerDeploymentStatus());
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
// Test listing failure
flinkService.clear();
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
@@ -137,16 +139,18 @@ public class JobObserverTest {
@Test
public void observeSavepoint() throws Exception {
+ Configuration flinkConf = new Configuration();
TestingFlinkService flinkService = new TestingFlinkService();
JobObserver observer =
new JobObserver(
flinkService,
- FlinkOperatorConfiguration.fromConfiguration(new
Configuration()));
+ FlinkOperatorConfiguration.fromConfiguration(new
Configuration()),
+ flinkConf);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
- Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
+ Configuration conf = FlinkUtils.getEffectiveConfig(deployment,
flinkConf);
flinkService.submitApplicationCluster(deployment, conf);
bringToReadyStatus(deployment);
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
@@ -155,7 +159,7 @@ public class JobObserverTest {
assertEquals(
"trigger_0",
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
"savepoint_0",
deployment
@@ -171,7 +175,7 @@ public class JobObserverTest {
assertEquals(
"trigger_1",
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
"savepoint_1",
deployment
@@ -199,15 +203,17 @@ public class JobObserverTest {
@Test
public void observeListJobsError() {
+ Configuration flinkConf = new Configuration();
TestingFlinkService flinkService = new TestingFlinkService();
JobObserver observer =
new JobObserver(
flinkService,
- FlinkOperatorConfiguration.fromConfiguration(new
Configuration()));
+ FlinkOperatorConfiguration.fromConfiguration(new
Configuration()),
+ flinkConf);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
- Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
+ Configuration conf = FlinkUtils.getEffectiveConfig(deployment,
flinkConf);
bringToReadyStatus(deployment);
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
@@ -220,9 +226,7 @@ public class JobObserverTest {
DeploymentFailedException.class,
() -> {
observer.observe(
- deployment,
-
TestUtils.createContextWithInProgressDeployment(),
- conf);
+ deployment,
TestUtils.createContextWithInProgressDeployment());
});
assertEquals(podFailedMessage, exception.getMessage());
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
index 3f154c2..003195e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
@@ -47,30 +47,32 @@ public class SessionObserverTest {
@Test
public void observeSessionCluster() {
TestingFlinkService flinkService = new TestingFlinkService();
+ FlinkDeployment deployment = TestUtils.buildSessionCluster();
+ Configuration flinkConf = new Configuration();
SessionObserver observer =
new SessionObserver(
flinkService,
- FlinkOperatorConfiguration.fromConfiguration(new
Configuration()));
- FlinkDeployment deployment = TestUtils.buildSessionCluster();
- Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
+ FlinkOperatorConfiguration.fromConfiguration(new
Configuration()),
+ flinkConf);
+ Configuration conf = FlinkUtils.getEffectiveConfig(deployment,
flinkConf);
deployment
.getStatus()
.getReconciliationStatus()
.setLastReconciledSpec(deployment.getSpec());
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
deployment.getStatus().getJobManagerDeploymentStatus());
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
// Observe again, the JM should be READY
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
@@ -78,18 +80,18 @@ public class SessionObserverTest {
// Test job manager is unavailable suddenly
flinkService.setPortReady(false);
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
// Job manager recovers
flinkService.setPortReady(true);
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
deployment.getStatus().getJobManagerDeploymentStatus());
- observer.observe(deployment, readyContext, conf);
+ observer.observe(deployment, readyContext);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
@@ -97,6 +99,7 @@ public class SessionObserverTest {
@Test
public void testWatchMultipleNamespaces() {
+ Configuration flinkConf = new Configuration();
FlinkService flinkService = new TestingFlinkService();
FlinkDeployment deployment = TestUtils.buildSessionCluster();
deployment
@@ -118,7 +121,7 @@ public class SessionObserverTest {
k8sDeployment.setStatus(new DeploymentStatus());
AtomicInteger secondaryResourceAccessed = new AtomicInteger(0);
- Observer allNsObserver = new SessionObserver(flinkService,
allNsConfig);
+ Observer allNsObserver = new SessionObserver(flinkService,
allNsConfig, flinkConf);
allNsObserver.observe(
deployment,
new Context() {
@@ -133,12 +136,12 @@ public class SessionObserverTest {
secondaryResourceAccessed.addAndGet(1);
return Optional.of((T) k8sDeployment);
}
- },
- FlinkUtils.getEffectiveConfig(deployment, new
Configuration()));
+ });
assertEquals(1, secondaryResourceAccessed.get());
- Observer specificNsObserver = new SessionObserver(flinkService,
specificNsConfig);
+ Observer specificNsObserver =
+ new SessionObserver(flinkService, specificNsConfig, flinkConf);
specificNsObserver.observe(
deployment,
new Context() {
@@ -153,12 +156,12 @@ public class SessionObserverTest {
secondaryResourceAccessed.addAndGet(1);
return Optional.of((T) k8sDeployment);
}
- },
- FlinkUtils.getEffectiveConfig(deployment, new
Configuration()));
+ });
assertEquals(2, secondaryResourceAccessed.get());
- Observer multipleNsObserver = new SessionObserver(flinkService,
multipleNsConfig);
+ Observer multipleNsObserver =
+ new SessionObserver(flinkService, multipleNsConfig, flinkConf);
multipleNsObserver.observe(
deployment,
new Context() {
@@ -173,8 +176,7 @@ public class SessionObserverTest {
secondaryResourceAccessed.addAndGet(1);
return Optional.of((T) k8sDeployment);
}
- },
- FlinkUtils.getEffectiveConfig(deployment, new
Configuration()));
+ });
assertEquals(3, secondaryResourceAccessed.get());
}