This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new ac21bc8f [FLINK-28845] Do not ignore initialSavepointPath if first
deploy fails completely
ac21bc8f is described below
commit ac21bc8fe148f6dc803988791224f792a66875ce
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Aug 9 08:51:14 2022 +0200
[FLINK-28845] Do not ignore initialSavepointPath if first deploy fails
completely
---
.../operator/crd/status/ReconciliationStatus.java | 6 +--
.../deployment/AbstractDeploymentObserver.java | 7 ++--
.../observer/sessionjob/SessionJobObserver.java | 1 +
.../reconciler/ReconciliationMetadata.java | 49 ++++++++++++++++++++++
.../operator/reconciler/ReconciliationUtils.java | 47 ++++++++++++++-------
.../controller/FlinkDeploymentControllerTest.java | 16 +++++++
.../deployment/ApplicationObserverTest.java | 17 +++++++-
.../observer/deployment/SessionObserverTest.java | 2 +-
.../sessionjob/SessionJobObserverTest.java | 16 +++++++
9 files changed, 137 insertions(+), 24 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
index b867887f..380971b7 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
@@ -21,10 +21,10 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationMetadata;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -68,12 +68,12 @@ public abstract class ReconciliationStatus<SPEC extends
AbstractFlinkSpec> {
}
@JsonIgnore
- public Tuple2<SPEC, ObjectNode> deserializeLastReconciledSpecWithMeta() {
+ public Tuple2<SPEC, ReconciliationMetadata>
deserializeLastReconciledSpecWithMeta() {
return ReconciliationUtils.deserializeSpecWithMeta(lastReconciledSpec,
getSpecClass());
}
@JsonIgnore
- public Tuple2<SPEC, ObjectNode> deserializeLastStableSpecWithMeta() {
+ public Tuple2<SPEC, ReconciliationMetadata>
deserializeLastStableSpecWithMeta() {
return ReconciliationUtils.deserializeSpecWithMeta(lastStableSpec,
getSpecClass());
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index 0ff0ce57..3a41a1c3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -76,13 +76,15 @@ public abstract class AbstractDeploymentObserver implements
Observer<FlinkDeploy
var reconciliationStatus = status.getReconciliationStatus();
// Nothing has been launched so skip observing
- if (reconciliationStatus.getState() ==
ReconciliationState.ROLLING_BACK) {
+ if (reconciliationStatus.isFirstDeployment()
+ || reconciliationStatus.getState() ==
ReconciliationState.ROLLING_BACK) {
return;
}
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
checkIfAlreadyUpgraded(flinkApp, context);
if (reconciliationStatus.getState() ==
ReconciliationState.UPGRADING) {
+
ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkApp);
return;
}
}
@@ -262,9 +264,6 @@ public abstract class AbstractDeploymentObserver implements
Observer<FlinkDeploy
*/
private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context<?>
context) {
var status = flinkDep.getStatus();
- if (status.getReconciliationStatus().isFirstDeployment()) {
- return;
- }
Optional<Deployment> depOpt =
context.getSecondaryResource(Deployment.class);
depOpt.ifPresent(
deployment -> {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 633744a1..6558dd19 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -116,6 +116,7 @@ public class SessionJobObserver implements
Observer<FlinkSessionJob> {
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig,
flinkService);
if (reconciliationStatus.getState() ==
ReconciliationState.UPGRADING) {
+
ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkSessionJob);
return;
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
new file mode 100644
index 00000000..a20f56f5
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Extra metadata to be attached to the reconciled spec. */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ReconciliationMetadata {
+
+ private String apiVersion;
+
+ private ObjectMeta metadata;
+
+ private boolean firstDeployment;
+
+ public static ReconciliationMetadata from(AbstractFlinkResource<?, ?>
resource) {
+ ObjectMeta metadata = new ObjectMeta();
+ metadata.setGeneration(resource.getMetadata().getGeneration());
+
+ var firstDeploy =
resource.getStatus().getReconciliationStatus().isFirstDeployment();
+
+ return new ReconciliationMetadata(resource.getApiVersion(), metadata,
firstDeploy);
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 70606512..dc6bd6c2 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -268,8 +268,9 @@ public class ReconciliationUtils {
* @return Tuple2 of spec and meta.
* @param <T> Spec type.
*/
- public static <T extends AbstractFlinkSpec> Tuple2<T, ObjectNode>
deserializeSpecWithMeta(
- @Nullable String specWithMetaString, Class<T> specClass) {
+ public static <T extends AbstractFlinkSpec>
+ Tuple2<T, ReconciliationMetadata> deserializeSpecWithMeta(
+ @Nullable String specWithMetaString, Class<T> specClass) {
if (specWithMetaString == null) {
return null;
}
@@ -284,7 +285,8 @@ public class ReconciliationUtils {
return Tuple2.of(objectMapper.treeToValue(wrapper, specClass),
null);
} else {
return Tuple2.of(
- objectMapper.treeToValue(wrapper.get("spec"),
specClass), internalMeta);
+ objectMapper.treeToValue(wrapper.get("spec"),
specClass),
+ objectMapper.convertValue(internalMeta,
ReconciliationMetadata.class));
}
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not deserialize spec, this
indicates a bug...", e);
@@ -300,29 +302,25 @@ public class ReconciliationUtils {
*/
public static String writeSpecWithMeta(
AbstractFlinkSpec spec, AbstractFlinkResource<?, ?>
relatedResource) {
-
- ObjectNode internalMeta = objectMapper.createObjectNode();
-
- internalMeta.put("apiVersion", relatedResource.getApiVersion());
- ObjectNode metadata = internalMeta.putObject("metadata");
- metadata.put("generation",
relatedResource.getMetadata().getGeneration());
-
- return writeSpecWithMeta(spec, internalMeta);
+ return writeSpecWithMeta(spec,
ReconciliationMetadata.from(relatedResource));
}
/**
* Serializes the spec and custom meta information into a JSON string.
*
* @param spec Flink resource spec.
- * @param meta Custom meta object.
+ * @param metadata Reconciliation meta object.
* @return Serialized json.
*/
- public static String writeSpecWithMeta(AbstractFlinkSpec spec, ObjectNode
meta) {
+ public static String writeSpecWithMeta(
+ AbstractFlinkSpec spec, ReconciliationMetadata metadata) {
ObjectNode wrapper = objectMapper.createObjectNode();
wrapper.set("spec",
objectMapper.valueToTree(Preconditions.checkNotNull(spec)));
- wrapper.set(INTERNAL_METADATA_JSON_KEY, meta);
+ wrapper.set(
+ INTERNAL_METADATA_JSON_KEY,
+
objectMapper.valueToTree(Preconditions.checkNotNull(metadata)));
try {
return objectMapper.writeValueAsString(wrapper);
@@ -434,7 +432,26 @@ public class ReconciliationUtils {
return -1L;
}
- return
lastSpecWithMeta.f1.get("metadata").get("generation").asLong(-1L);
+ return lastSpecWithMeta.f1.getMetadata().getGeneration();
+ }
+
+ /**
+ * Clear last reconciled spec if that corresponds to the first deployment.
This is important in
+ * cases where the first deployment fails.
+ *
+ * @param resource Flink resource.
+ */
+ public static void
clearLastReconciledSpecIfFirstDeploy(AbstractFlinkResource<?, ?> resource) {
+ var reconStatus = resource.getStatus().getReconciliationStatus();
+ var lastSpecWithMeta =
reconStatus.deserializeLastReconciledSpecWithMeta();
+
+ if (lastSpecWithMeta.f1 == null) {
+ return;
+ }
+
+ if (lastSpecWithMeta.f1.isFirstDeployment()) {
+ reconStatus.setLastReconciledSpec(null);
+ }
}
/**
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index b6aeb02c..c57f2e2f 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -70,6 +70,7 @@ import static
org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.params.provider.Arguments.arguments;
/** {@link FlinkDeploymentController} tests. */
@@ -946,4 +947,19 @@ public class FlinkDeploymentControllerTest {
}
return args.stream();
}
+
+ @Test
+ public void testInitialSavepointOnError() throws Exception {
+ FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
+ flinkDeployment.getSpec().getJob().setInitialSavepointPath("msp");
+ flinkService.setDeployFailure(true);
+ try {
+ testController.reconcile(flinkDeployment, context);
+ fail();
+ } catch (Exception expected) {
+ }
+ flinkService.setDeployFailure(false);
+ testController.reconcile(flinkDeployment, context);
+ assertEquals("msp", flinkService.listJobs().get(0).f0);
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 3a72a520..aa9504de 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -488,8 +488,23 @@ public class ApplicationObserverTest {
status.getJobManagerDeploymentStatus());
var specWithMeta =
status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
- assertEquals(321L,
specWithMeta.f1.get("metadata").get("generation").asLong());
+ assertEquals(321L, specWithMeta.f1.getMetadata().getGeneration());
assertEquals(JobState.RUNNING, specWithMeta.f0.getJob().getState());
assertEquals(5, specWithMeta.f0.getJob().getParallelism());
}
+
+ @Test
+ public void validateLastReconciledClearedOnInitialFailure() {
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment.getMetadata().setGeneration(123L);
+
+ ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
+ deployment,
+ new FlinkConfigManager(new Configuration())
+ .getDeployConfig(deployment.getMetadata(),
deployment.getSpec()));
+
+
assertFalse(deployment.getStatus().getReconciliationStatus().isFirstDeployment());
+ observer.observe(deployment, TestUtils.createEmptyContext());
+
assertTrue(deployment.getStatus().getReconciliationStatus().isFirstDeployment());
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
index 2d4acd51..36cf6446 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
@@ -149,7 +149,7 @@ public class SessionObserverTest {
status.getJobManagerDeploymentStatus());
var specWithMeta =
status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
- assertEquals(321L,
specWithMeta.f1.get("metadata").get("generation").asLong());
+ assertEquals(321L, specWithMeta.f1.getMetadata().getGeneration());
assertEquals("1", specWithMeta.f0.getFlinkConfiguration().get("k"));
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index dec08ef8..37db442b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -31,6 +31,7 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -46,6 +47,9 @@ import org.junit.jupiter.api.Test;
import java.util.Map;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
/** Tests for {@link SessionJobObserver}. */
@EnableKubernetesMockClient(crud = true)
public class SessionJobObserverTest {
@@ -341,4 +345,16 @@ public class SessionJobObserverTest {
Assertions.assertTrue(
exception.getMessage().contains("doesn't match upgrade target
generation"));
}
+
+ @Test
+ public void validateLastReconciledClearedOnInitialFailure() {
+ var sessionJob = TestUtils.buildSessionJob();
+ sessionJob.getMetadata().setGeneration(123L);
+
+ ReconciliationUtils.updateStatusBeforeDeploymentAttempt(sessionJob,
new Configuration());
+
+
assertFalse(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment());
+ observer.observe(sessionJob,
TestUtils.createContextWithReadyFlinkDeployment());
+
assertTrue(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment());
+ }
}