This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new ab59d6e [FLINK-27820] Handle deployment errors on observe
ab59d6e is described below
commit ab59d6eb980512775590d0d01e697fe0c28d1b3b
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Jun 20 15:22:26 2022 +0200
[FLINK-27820] Handle deployment errors on observe
---
.../flink/kubernetes/operator/FlinkOperator.java | 1 -
.../operator/config/FlinkConfigManager.java | 5 +-
.../controller/FlinkDeploymentController.java | 8 --
.../kubernetes/operator/crd/FlinkDeployment.java | 4 +-
.../operator/crd/status/ReconciliationStatus.java | 26 +++++-
.../deployment/AbstractDeploymentObserver.java | 61 +++++++++++-
.../operator/reconciler/ReconciliationUtils.java | 71 +++++++++-----
.../kubernetes/operator/utils/FlinkUtils.java | 16 ++++
.../flink/kubernetes/operator/TestUtils.java | 74 ++++-----------
.../operator/config/FlinkConfigManagerTest.java | 14 ++-
.../operator/observer/SavepointObserverTest.java | 2 +-
.../deployment/ApplicationObserverTest.java | 103 ++++++++++++++++-----
.../observer/deployment/SessionObserverTest.java | 8 +-
.../operator/utils/ReconciliationUtilsTest.java | 34 +++++--
.../operator/utils/SavepointUtilsTest.java | 4 +-
.../operator/utils/StatusRecorderTest.java | 2 +-
.../operator/validation/DefaultValidatorTest.java | 8 +-
17 files changed, 297 insertions(+), 144 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index d5313d4..cfbb450 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -122,7 +122,6 @@ public class FlinkOperator {
var controller =
new FlinkDeploymentController(
configManager,
- client,
validators,
reconcilerFactory,
observerFactory,
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index c8e4bc3..8f67978 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -25,6 +25,7 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
@@ -144,7 +145,9 @@ public class FlinkConfigManager {
}
public Configuration getDeployConfig(ObjectMeta objectMeta,
FlinkDeploymentSpec spec) {
- return getConfig(objectMeta, spec);
+ var conf = getConfig(objectMeta, spec);
+ FlinkUtils.setGenerationAnnotation(conf, objectMeta.getGeneration());
+ return conf;
}
public Configuration getObserveConfig(FlinkDeployment deployment) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index df69871..346bbe3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -18,7 +18,6 @@
package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -35,7 +34,6 @@ import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
-import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
@@ -53,7 +51,6 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
/** Controller that runs the main reconcile loop for Flink deployments. */
@ControllerConfiguration()
@@ -65,7 +62,6 @@ public class FlinkDeploymentController
private static final Logger LOG =
LoggerFactory.getLogger(FlinkDeploymentController.class);
private final FlinkConfigManager configManager;
- private final KubernetesClient kubernetesClient;
private final Set<FlinkResourceValidator> validators;
private final ReconcilerFactory reconcilerFactory;
@@ -73,12 +69,9 @@ public class FlinkDeploymentController
private final MetricManager<FlinkDeployment> metricManager;
private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
private final EventRecorder eventRecorder;
- private final ConcurrentHashMap<Tuple2<String, String>,
FlinkDeploymentStatus> statusCache =
- new ConcurrentHashMap<>();
public FlinkDeploymentController(
FlinkConfigManager configManager,
- KubernetesClient kubernetesClient,
Set<FlinkResourceValidator> validators,
ReconcilerFactory reconcilerFactory,
ObserverFactory observerFactory,
@@ -86,7 +79,6 @@ public class FlinkDeploymentController
StatusRecorder<FlinkDeploymentStatus> statusRecorder,
EventRecorder eventRecorder) {
this.configManager = configManager;
- this.kubernetesClient = kubernetesClient;
this.validators = validators;
this.reconcilerFactory = reconcilerFactory;
this.observerFactory = observerFactory;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
index 187319d..04ed569 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.crd;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -39,8 +40,9 @@ public class FlinkDeployment
extends AbstractFlinkResource<FlinkDeploymentSpec,
FlinkDeploymentStatus>
implements Namespaced {
+ @VisibleForTesting
@Override
- protected FlinkDeploymentStatus initStatus() {
+ public FlinkDeploymentStatus initStatus() {
return new FlinkDeploymentStatus();
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
index 0bfe6a4..1d06e90 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
@@ -18,10 +18,13 @@
package org.apache.flink.kubernetes.operator.crd.status;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -47,24 +50,37 @@ public abstract class ReconciliationStatus<SPEC extends
AbstractFlinkSpec> {
private String lastStableSpec;
/** Deployment state of the last reconciled spec. */
- private ReconciliationState state = ReconciliationState.DEPLOYED;
+ private ReconciliationState state = ReconciliationState.UPGRADING;
@JsonIgnore
public abstract Class<SPEC> getSpecClass();
@JsonIgnore
public SPEC deserializeLastReconciledSpec() {
- return
ReconciliationUtils.deserializedSpecWithVersion(lastReconciledSpec,
getSpecClass());
+ var specWithMeta = deserializeLastReconciledSpecWithMeta();
+ return specWithMeta != null ? specWithMeta.f0 : null;
}
@JsonIgnore
public SPEC deserializeLastStableSpec() {
- return ReconciliationUtils.deserializedSpecWithVersion(lastStableSpec,
getSpecClass());
+ var specWithMeta = deserializeLastStableSpecWithMeta();
+ return specWithMeta != null ? specWithMeta.f0 : null;
}
@JsonIgnore
- public void serializeAndSetLastReconciledSpec(SPEC spec) {
-
setLastReconciledSpec(ReconciliationUtils.writeSpecWithCurrentVersion(spec));
+ public Tuple2<SPEC, ObjectNode> deserializeLastReconciledSpecWithMeta() {
+ return ReconciliationUtils.deserializeSpecWithMeta(lastReconciledSpec,
getSpecClass());
+ }
+
+ @JsonIgnore
+ public Tuple2<SPEC, ObjectNode> deserializeLastStableSpecWithMeta() {
+ return ReconciliationUtils.deserializeSpecWithMeta(lastStableSpec,
getSpecClass());
+ }
+
+ @JsonIgnore
+ public void serializeAndSetLastReconciledSpec(
+ SPEC spec, AbstractFlinkResource<SPEC, ?> resource) {
+ setLastReconciledSpec(ReconciliationUtils.writeSpecWithMeta(spec,
resource));
}
public void markReconciledSpecAsStable() {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index d3217f3..4a2d08c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -34,6 +34,7 @@ import
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
@@ -75,15 +76,19 @@ public abstract class AbstractDeploymentObserver implements
Observer<FlinkDeploy
public void observe(FlinkDeployment flinkApp, Context context) {
var status = flinkApp.getStatus();
var reconciliationStatus = status.getReconciliationStatus();
- var lastReconciledSpec =
reconciliationStatus.deserializeLastReconciledSpec();
// Nothing has been launched so skip observing
- if (lastReconciledSpec == null
- || reconciliationStatus.getState() ==
ReconciliationState.ROLLING_BACK
- || reconciliationStatus.getState() ==
ReconciliationState.UPGRADING) {
+ if (reconciliationStatus.getState() ==
ReconciliationState.ROLLING_BACK) {
return;
}
+ if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+ checkIfAlreadyUpgraded(flinkApp, context);
+ if (reconciliationStatus.getState() ==
ReconciliationState.UPGRADING) {
+ return;
+ }
+ }
+
Configuration observeConfig = configManager.getObserveConfig(flinkApp);
if (!isJmDeploymentReady(flinkApp)) {
observeJmDeployment(flinkApp, context, observeConfig);
@@ -252,6 +257,54 @@ public abstract class AbstractDeploymentObserver
implements Observer<FlinkDeploy
EventUtils.Component.JobManagerDeployment);
}
+ /**
+ * Checks a deployment that is currently in the UPGRADING state whether it
was already deployed
+ * but we simply miss the status information. After comparing the target
resource generation
+ * with the one from the possible deployment if they match we update the
status to the already
+ * DEPLOYED state.
+ *
+ * @param flinkDep Flink resource to check.
+ * @param context Context for reconciliation.
+ */
+ private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context
context) {
+ Optional<Deployment> depOpt =
context.getSecondaryResource(Deployment.class);
+ var status = flinkDep.getStatus();
+ depOpt.ifPresent(
+ deployment -> {
+ Map<String, String> annotations =
deployment.getMetadata().getAnnotations();
+ if (annotations == null) {
+ return;
+ }
+ Long deployedGeneration =
+
Optional.ofNullable(annotations.get(FlinkUtils.CR_GENERATION_LABEL))
+ .map(Long::valueOf)
+ .orElse(-1L);
+
+ Long upgradeTargetGeneration =
+
ReconciliationUtils.getUpgradeTargetGeneration(flinkDep);
+
+ if (deployedGeneration.equals(upgradeTargetGeneration)) {
+ logger.info(
+ "Last reconciled generation is already
deployed, setting reconciliation status to "
+ + ReconciliationState.DEPLOYED);
+
+ var firstDeploy =
+
status.getReconciliationStatus().getLastReconciledSpec() == null;
+ var conf =
+ firstDeploy
+ ? configManager.getDeployConfig(
+ flinkDep.getMetadata(),
flinkDep.getSpec())
+ :
configManager.getObserveConfig(flinkDep);
+
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ flinkDep, JobState.RUNNING, conf);
+ status.getJobStatus()
+
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+ }
+ });
+ }
+
/**
* Observe the flinkApp status when the cluster is ready. It will be
implemented by child class
* to reflect the changed status on the flinkApp resource.
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 85bdf52..ff59b7d 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -17,12 +17,12 @@
package org.apache.flink.kubernetes.operator.reconciler;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.crd.CrdConstants;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -46,7 +46,6 @@ import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.databind.node.TextNode;
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
@@ -65,6 +64,8 @@ public class ReconciliationUtils {
private static final Logger LOG =
LoggerFactory.getLogger(ReconciliationUtils.class);
+ public static final String INTERNAL_METADATA_JSON_KEY =
"resource_metadata";
+
private static final ObjectMapper objectMapper = new ObjectMapper();
public static <SPEC extends AbstractFlinkSpec> void
updateForSpecReconciliationSuccess(
@@ -97,7 +98,7 @@ public class ReconciliationUtils {
}
clonedSpec.getJob().setState(stateAfterReconcile);
}
- reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec);
+ reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec,
target);
reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
reconciliationStatus.setState(
upgrading ? ReconciliationState.UPGRADING :
ReconciliationState.DEPLOYED);
@@ -120,10 +121,12 @@ public class ReconciliationUtils {
var spec = target.getSpec();
var reconciliationStatus = commonStatus.getReconciliationStatus();
var lastReconciledSpec =
reconciliationStatus.deserializeLastReconciledSpec();
+
lastReconciledSpec
.getJob()
.setSavepointTriggerNonce(spec.getJob().getSavepointTriggerNonce());
-
reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec);
+
+
reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec,
target);
reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
}
@@ -209,12 +212,7 @@ public class ReconciliationUtils {
var reconciliationStatus =
deployment.getStatus().getReconciliationStatus();
var reconciliationState = reconciliationStatus.getState();
if (reconciliationState != ReconciliationState.ROLLED_BACK) {
- var lastReconciledSpec =
reconciliationStatus.deserializeLastReconciledSpec();
- if (lastReconciledSpec == null) {
- return null;
- } else {
- return lastReconciledSpec;
- }
+ return reconciliationStatus.deserializeLastReconciledSpec();
} else {
return reconciliationStatus.deserializeLastStableSpec();
}
@@ -229,26 +227,41 @@ public class ReconciliationUtils {
|| currentReconState == ReconciliationState.UPGRADING;
}
- public static <T> T deserializedSpecWithVersion(
- @Nullable String specString, Class<T> specClass) {
- if (specString == null) {
+ public static <T> Tuple2<T, ObjectNode> deserializeSpecWithMeta(
+ @Nullable String specWithMetaString, Class<T> specClass) {
+ if (specWithMetaString == null) {
return null;
}
try {
- ObjectNode objectNode = (ObjectNode)
objectMapper.readTree(specString);
- objectNode.remove("apiVersion");
- return objectMapper.treeToValue(objectNode, specClass);
+ ObjectNode wrapper = (ObjectNode)
objectMapper.readTree(specWithMetaString);
+ ObjectNode internalMeta = (ObjectNode)
wrapper.remove(INTERNAL_METADATA_JSON_KEY);
+
+ if (internalMeta == null) {
+ // migrating from old format
+ wrapper.remove("apiVersion");
+ return Tuple2.of(objectMapper.treeToValue(wrapper, specClass),
internalMeta);
+ } else {
+ return Tuple2.of(
+ objectMapper.treeToValue(wrapper.get("spec"),
specClass), internalMeta);
+ }
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not deserialize spec, this
indicates a bug...", e);
}
}
- public static String writeSpecWithCurrentVersion(Object spec) {
- ObjectNode objectNode =
objectMapper.valueToTree(Preconditions.checkNotNull(spec));
- objectNode.set("apiVersion", new TextNode(CrdConstants.API_VERSION));
+ public static String writeSpecWithMeta(
+ Object spec, AbstractFlinkResource<?, ?> relatedResource) {
+ ObjectNode wrapper = objectMapper.createObjectNode();
+ wrapper.set("spec",
objectMapper.valueToTree(Preconditions.checkNotNull(spec)));
+
+ ObjectNode internalMeta =
wrapper.putObject(INTERNAL_METADATA_JSON_KEY);
+ internalMeta.put("apiVersion", relatedResource.getApiVersion());
+ ObjectNode metadata = internalMeta.putObject("metadata");
+ metadata.put("generation",
relatedResource.getMetadata().getGeneration());
+
try {
- return objectMapper.writeValueAsString(objectNode);
+ return objectMapper.writeValueAsString(wrapper);
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not serialize spec, this
indicates a bug...", e);
}
@@ -269,7 +282,7 @@ public class ReconciliationUtils {
return false;
}
- FlinkDeploymentSpec lastStableSpec =
reconciliationStatus.deserializeLastStableSpec();
+ var lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
if (lastStableSpec != null
&& lastStableSpec.getJob() != null
&& lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
@@ -401,4 +414,20 @@ public class ReconciliationUtils {
// Status was updated already, no need to return anything
return ErrorStatusUpdateControl.noStatusUpdate();
}
+
+ public static Long getUpgradeTargetGeneration(FlinkDeployment deployment) {
+ var lastSpecWithMeta =
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpecWithMeta();
+
+ if (lastSpecWithMeta == null || lastSpecWithMeta.f1 == null) {
+ // For first deployments and when migrating from before this
feature simply return
+ // current generation
+ return deployment.getMetadata().getGeneration();
+ }
+
+ return
lastSpecWithMeta.f1.get("metadata").get("generation").asLong(-1L);
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index f672d65..5d0cc6b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -46,6 +46,8 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Predicate;
@@ -58,6 +60,8 @@ public class FlinkUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkUtils.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
+ public static final String CR_GENERATION_LABEL =
"flinkdeployment.flink.apache.org/generation";
+
public static Pod mergePodTemplates(Pod toPod, Pod fromPod) {
if (fromPod == null) {
return toPod;
@@ -286,4 +290,16 @@ public class FlinkUtils {
int taskSlots = conf.get(TaskManagerOptions.NUM_TASK_SLOTS);
return (parallelism + taskSlots - 1) / taskSlots;
}
+
+ public static void setGenerationAnnotation(Configuration conf, Long
generation) {
+ if (generation == null) {
+ return;
+ }
+ var labels =
+ new HashMap<>(
+
conf.getOptional(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS)
+ .orElse(Collections.emptyMap()));
+ labels.put(CR_GENERATION_LABEL, generation.toString());
+ conf.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, labels);
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index c619633..4f429c3 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -49,6 +49,7 @@ import org.apache.flink.metrics.testutils.MetricListener;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
@@ -69,6 +70,8 @@ import okhttp3.Headers;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.Assertions;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -131,6 +134,7 @@ public class TestUtils {
.upgradeMode(UpgradeMode.STATELESS)
.state(JobState.RUNNING)
.build());
+ deployment.setStatus(deployment.initStatus());
return deployment;
}
@@ -211,35 +215,6 @@ public class TestUtils {
return new PodListBuilder().withItems(pod).build();
}
- public static Context createEmptyContext() {
- return new Context() {
- @Override
- public Optional<RetryInfo> getRetryInfo() {
- return Optional.empty();
- }
-
- @Override
- public Optional getSecondaryResource(Class aClass, String s) {
- return Optional.empty();
- }
-
- @Override
- public Set getSecondaryResources(Class expectedType) {
- return null;
- }
-
- @Override
- public ControllerConfiguration getControllerConfiguration() {
- return null;
- }
-
- @Override
- public ManagedDependentResourceContext
managedDependentResourceContext() {
- return null;
- }
- };
- }
-
public static Deployment createDeployment(boolean ready) {
DeploymentStatus status = new DeploymentStatus();
status.setAvailableReplicas(ready ? 1 : 0);
@@ -247,12 +222,13 @@ public class TestUtils {
DeploymentSpec spec = new DeploymentSpec();
spec.setReplicas(1);
Deployment deployment = new Deployment();
+ deployment.setMetadata(new ObjectMeta());
deployment.setSpec(spec);
deployment.setStatus(status);
return deployment;
}
- public static Context createContextWithReadyJobManagerDeployment() {
+ public static Context createContextWithDeployment(@Nullable Deployment
deployment) {
return new Context() {
@Override
public Optional<RetryInfo> getRetryInfo() {
@@ -261,7 +237,7 @@ public class TestUtils {
@Override
public Optional getSecondaryResource(Class expectedType, String
eventSourceName) {
- return Optional.of(createDeployment(true));
+ return Optional.ofNullable(deployment);
}
@Override
@@ -281,33 +257,16 @@ public class TestUtils {
};
}
- public static Context createContextWithInProgressDeployment() {
- return new Context() {
- @Override
- public Optional<RetryInfo> getRetryInfo() {
- return Optional.empty();
- }
-
- @Override
- public Optional getSecondaryResource(Class expectedType, String
eventSourceName) {
- return Optional.of(createDeployment(false));
- }
-
- @Override
- public Set getSecondaryResources(Class expectedType) {
- return null;
- }
+ public static Context createEmptyContext() {
+ return createContextWithDeployment(null);
+ }
- @Override
- public ControllerConfiguration getControllerConfiguration() {
- return null;
- }
+ public static Context createContextWithReadyJobManagerDeployment() {
+ return createContextWithDeployment(createDeployment(true));
+ }
- @Override
- public ManagedDependentResourceContext
managedDependentResourceContext() {
- return null;
- }
- };
+ public static Context createContextWithInProgressDeployment() {
+ return createContextWithDeployment(createDeployment(false));
}
public static Context createContextWithReadyFlinkDeployment() {
@@ -329,7 +288,7 @@ public class TestUtils {
session.getSpec().getFlinkConfiguration().putAll(flinkDepConfig);
session.getStatus()
.getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(session.getSpec());
+ .serializeAndSetLastReconciledSpec(session.getSpec(),
session);
return Optional.of(session);
}
@@ -482,7 +441,6 @@ public class TestUtils {
var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
return new FlinkDeploymentController(
configManager,
- kubernetesClient,
ValidatorUtils.discoverValidators(configManager),
new ReconcilerFactory(kubernetesClient, flinkService,
configManager, eventRecorder),
new ObserverFactory(flinkService, configManager,
statusRecorder, eventRecorder),
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
index 5be498b..6321c92 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.utils.Constants;
import io.fabric8.kubernetes.api.model.Pod;
@@ -61,7 +62,7 @@ public class FlinkConfigManagerTest {
deployment.getStatus().getReconciliationStatus();
deployment.getSpec().getFlinkConfiguration().put(testConf.key(),
"reconciled");
-
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec());
+
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
reconciliationStatus.markReconciledSpecAsStable();
deployment.getSpec().getFlinkConfiguration().put(testConf.key(),
"latest");
@@ -73,14 +74,21 @@ public class FlinkConfigManagerTest {
assertEquals("reconciled",
configManager.getObserveConfig(deployment).get(testConf));
deployment.getSpec().getFlinkConfiguration().put(testConf.key(),
"stable");
-
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec());
+
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
reconciliationStatus.markReconciledSpecAsStable();
deployment.getSpec().getFlinkConfiguration().put(testConf.key(),
"rolled-back");
-
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec());
+
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
assertEquals("stable",
configManager.getObserveConfig(deployment).get(testConf));
+
+ deployment.getMetadata().setGeneration(5L);
+ var deployConfig =
+ configManager.getDeployConfig(deployment.getMetadata(),
deployment.getSpec());
+ assertEquals(
+ Map.of(FlinkUtils.CR_GENERATION_LABEL, "5"),
+
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS));
}
@Test
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
index 946f7b5..5182ffa 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
@@ -138,7 +138,7 @@ public class SavepointObserverTest {
deployment
.getStatus()
.getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec());
+ .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
var jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setState("RUNNING");
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index b9e30f8..f7ac717 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -26,13 +26,14 @@ import
org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -45,9 +46,11 @@ import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
+import java.util.HashMap;
import java.util.stream.Collectors;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -73,20 +76,15 @@ public class ApplicationObserverTest {
@Test
public void observeApplicationCluster() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
- deployment.setStatus(new FlinkDeploymentStatus());
-
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(),
deployment.getSpec());
observer.observe(deployment, TestUtils.createEmptyContext());
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
- deployment
- .getStatus()
- .getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec());
- deployment.getStatus().setJobStatus(new JobStatus());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(),
conf, false);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ deployment, JobState.RUNNING, new Configuration());
// Validate port check logic
flinkService.setPortReady(false);
@@ -180,11 +178,9 @@ public class ApplicationObserverTest {
configManager.getDeployConfig(deployment.getMetadata(),
deployment.getSpec());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(),
conf, false);
- deployment.setStatus(new FlinkDeploymentStatus());
- deployment
- .getStatus()
- .getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec());
+ deployment.setStatus(deployment.initStatus());
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ deployment, JobState.RUNNING, new Configuration());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
observer.observe(deployment, readyContext);
@@ -216,11 +212,9 @@ public class ApplicationObserverTest {
configManager.getDeployConfig(deployment.getMetadata(),
deployment.getSpec());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(),
conf, false);
- deployment.setStatus(new FlinkDeploymentStatus());
- deployment
- .getStatus()
- .getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec());
+ deployment.setStatus(deployment.initStatus());
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ deployment, JobState.RUNNING, new Configuration());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
observer.observe(deployment, readyContext);
@@ -418,10 +412,8 @@ public class ApplicationObserverTest {
}
private void bringToReadyStatus(FlinkDeployment deployment) {
- deployment
- .getStatus()
- .getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec());
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ deployment, JobState.RUNNING, new Configuration());
JobStatus jobStatus = new JobStatus();
jobStatus.setJobName("jobname");
jobStatus.setJobId("0000000000");
@@ -451,4 +443,71 @@ public class ApplicationObserverTest {
});
assertEquals(podFailedMessage, exception.getMessage());
}
+
+ @Test
+ public void observeAlreadyUpgraded() {
+ var kubernetesDeployment = TestUtils.createDeployment(true);
+ kubernetesDeployment.getMetadata().setAnnotations(new HashMap<>());
+
+ var context =
TestUtils.createContextWithDeployment(kubernetesDeployment);
+
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment.getMetadata().setGeneration(123L);
+
+ var status = deployment.getStatus();
+ var reconStatus = status.getReconciliationStatus();
+
+ // New deployment
+ assertEquals(ReconciliationState.UPGRADING, reconStatus.getState());
+ assertNull(reconStatus.getLastReconciledSpec());
+
+ kubernetesDeployment.getMetadata().setGeneration(1L);
+ observer.observe(deployment, context);
+
+ assertEquals(ReconciliationState.UPGRADING, reconStatus.getState());
+ assertNull(reconStatus.getLastReconciledSpec());
+
+ kubernetesDeployment
+ .getMetadata()
+ .getAnnotations()
+ .put(FlinkUtils.CR_GENERATION_LABEL, "123");
+ observer.observe(deployment, context);
+
+ assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
+ assertNotNull(reconStatus.getLastReconciledSpec());
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ status.getJobManagerDeploymentStatus());
+
+ deployment.getMetadata().setGeneration(321L);
+
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ deployment,
+ JobState.SUSPENDED,
+ new FlinkConfigManager(new Configuration())
+ .getDeployConfig(deployment.getMetadata(),
deployment.getSpec()));
+ status = deployment.getStatus();
+ reconStatus = status.getReconciliationStatus();
+
+ assertEquals(status.getReconciliationStatus().getState(),
ReconciliationState.UPGRADING);
+
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+
+ observer.observe(deployment, TestUtils.createEmptyContext());
+ assertEquals(JobManagerDeploymentStatus.MISSING,
status.getJobManagerDeploymentStatus());
+
+ observer.observe(deployment, context);
+ assertEquals(JobManagerDeploymentStatus.MISSING,
status.getJobManagerDeploymentStatus());
+
+ kubernetesDeployment
+ .getMetadata()
+ .getAnnotations()
+ .put(FlinkUtils.CR_GENERATION_LABEL, "321");
+ observer.observe(deployment, context);
+
+ assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
+ assertNotNull(reconStatus.getLastReconciledSpec());
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ status.getJobManagerDeploymentStatus());
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
index 515cdb8..ba0df3a 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -42,10 +44,8 @@ public class SessionObserverTest {
FlinkDeployment deployment = TestUtils.buildSessionCluster();
SessionObserver observer =
new SessionObserver(flinkService, configManager, new
EventRecorder(null, null));
- deployment
- .getStatus()
- .getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec());
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ deployment, JobState.RUNNING, new Configuration());
observer.observe(deployment, readyContext);
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 3b3f4b2..3f37c18 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.crd.CrdConstants;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
/** Test for {@link
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils}. */
public class ReconciliationUtilsTest {
@@ -46,11 +47,11 @@ public class ReconciliationUtilsTest {
public void testRescheduleUpgradeImmediately() {
FlinkDeployment app = TestUtils.buildApplicationCluster();
app.getSpec().getJob().setState(JobState.RUNNING);
+
app.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ app, JobState.RUNNING, new Configuration());
FlinkDeployment previous = ReconciliationUtils.clone(app);
FlinkDeployment current = ReconciliationUtils.clone(app);
- current.getStatus()
- .getReconciliationStatus()
-
.serializeAndSetLastReconciledSpec(ReconciliationUtils.clone(current.getSpec()));
ReconciliationUtils.updateForSpecReconciliationSuccess(
current, JobState.SUSPENDED, new Configuration());
@@ -72,12 +73,29 @@ public class ReconciliationUtilsTest {
@Test
public void testSpecSerializationWithVersion() throws
JsonProcessingException {
FlinkDeployment app = TestUtils.buildApplicationCluster();
- String serialized =
ReconciliationUtils.writeSpecWithCurrentVersion(app.getSpec());
+ app.getMetadata().setGeneration(12L);
+ String serialized =
ReconciliationUtils.writeSpecWithMeta(app.getSpec(), app);
ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);
- assertEquals(CrdConstants.API_VERSION,
node.get("apiVersion").asText());
+
+ ObjectNode internalMeta =
+ (ObjectNode)
node.get(ReconciliationUtils.INTERNAL_METADATA_JSON_KEY);
+ assertEquals("flink.apache.org/v1beta1",
internalMeta.get("apiVersion").asText());
+ assertEquals(12L,
internalMeta.get("metadata").get("generation").asLong());
assertEquals(
app.getSpec(),
- ReconciliationUtils.deserializedSpecWithVersion(
- serialized, FlinkDeploymentSpec.class));
+ ReconciliationUtils.deserializeSpecWithMeta(serialized,
FlinkDeploymentSpec.class)
+ .f0);
+
+ // test backward compatibility
+ String oldSerialized =
+
"{\"job\":{\"jarURI\":\"local:///opt/flink/examples/streaming/StateMachineExample.jar\",\"parallelism\":2,\"entryClass\":null,\"args\":[],\"state\":\"running\",\"savepointTriggerNonce\":null,\"initialSavepointPath\":null,\"upgradeMode\":\"stateless\",\"allowNonRestoredState\":null},\"restartNonce\":null,\"flinkConfiguration\":{\"taskmanager.numberOfTaskSlots\":\"2\"},\"image\":\"flink:1.15\",\"imagePullPolicy\":null,\"serviceAccount\":\"flink\",\"flinkVersion\":\"v1_15\",
[...]
+
+ var migrated =
+ ReconciliationUtils.deserializeSpecWithMeta(
+ oldSerialized, FlinkDeploymentSpec.class);
+ assertEquals(
+
"local:///opt/flink/examples/streaming/StateMachineExample.jar",
+ migrated.f0.getJob().getJarURI());
+ assertNull(migrated.f1);
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SavepointUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SavepointUtilsTest.java
index ee004a0..f369037 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SavepointUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SavepointUtilsTest.java
@@ -53,7 +53,7 @@ public class SavepointUtilsTest {
deployment
.getStatus()
.getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec());
+ .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
assertEquals(
Optional.empty(),
@@ -67,7 +67,7 @@ public class SavepointUtilsTest {
deployment
.getStatus()
.getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec());
+ .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
assertEquals(
Optional.of(SavepointTriggerType.PERIODIC),
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index 209c1d2..6724005 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -45,7 +45,7 @@ public class StatusRecorderTest {
helper.patchAndCacheStatus(deployment);
assertTrue(mockServer.getLastRequest() != lastRequest);
lastRequest = mockServer.getLastRequest();
-
deployment.getStatus().getReconciliationStatus().setState(ReconciliationState.UPGRADING);
+
deployment.getStatus().getReconciliationStatus().setState(ReconciliationState.ROLLING_BACK);
helper.patchAndCacheStatus(deployment);
// We intentionally compare references
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index d0dce8f..e0c929c 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -257,7 +257,7 @@ public class DefaultValidatorTest {
dep.getStatus()
.getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(spec);
+ .serializeAndSetLastReconciledSpec(spec, dep);
dep.getSpec()
.getFlinkConfiguration()
@@ -278,7 +278,7 @@ public class DefaultValidatorTest {
dep.getStatus()
.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(
- ReconciliationUtils.clone(dep.getSpec()));
+ ReconciliationUtils.clone(dep.getSpec()),
dep);
dep.getSpec().setJob(null);
},
"Cannot switch from job to session cluster");
@@ -294,7 +294,7 @@ public class DefaultValidatorTest {
spec.setJob(null);
dep.getStatus()
.getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(spec);
+ .serializeAndSetLastReconciledSpec(spec, dep);
},
"Cannot switch from session to job cluster");
@@ -316,7 +316,7 @@ public class DefaultValidatorTest {
dep.getStatus()
.getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(spec);
+ .serializeAndSetLastReconciledSpec(spec, dep);
dep.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
},
String.format(