This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new ee20209 [FLINK-26261] Introduce status and create event for terminal
deployment error
ee20209 is described below
commit ee20209d95b7a155be0761eb1b60829fe489924f
Author: Thomas Weise <[email protected]>
AuthorDate: Mon Mar 7 20:32:02 2022 -0800
[FLINK-26261] Introduce status and create event for terminal deployment
error
Closes #46
---
.../controller/FlinkDeploymentController.java | 48 ++++++++++++-----
.../exception/DeploymentFailedException.java | 63 ++++++++++++++++++++++
.../exception/InvalidDeploymentException.java | 30 -----------
.../observer/JobManagerDeploymentStatus.java | 9 +++-
.../kubernetes/operator/observer/Observer.java | 16 ++++++
.../flink/kubernetes/operator/TestUtils.java | 62 +++++++++++++++++++++
.../controller/FlinkDeploymentControllerTest.java | 56 ++++++++++++++++++-
.../kubernetes/operator/observer/ObserverTest.java | 6 +--
.../operator/reconciler/JobReconcilerTest.java | 34 ++----------
.../crds/flinkdeployments.flink.apache.org-v1.yml | 1 +
10 files changed, 242 insertions(+), 83 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 26c0097..2a2a8d3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -21,8 +21,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.DefaultConfig;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
+import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -31,6 +32,7 @@ import
org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
import org.apache.flink.util.Preconditions;
+import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
@@ -92,8 +94,11 @@ public class FlinkDeploymentController
LOG.info("Stopping cluster {}", flinkApp.getMetadata().getName());
Configuration effectiveConfig =
FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig());
-
- observer.observe(flinkApp, context, effectiveConfig);
+ try {
+ observer.observe(flinkApp, context, effectiveConfig);
+ } catch (DeploymentFailedException dfe) {
+ // ignore during cleanup
+ }
return reconcilerFactory
.getOrCreate(flinkApp)
.cleanup(operatorNamespace, flinkApp, effectiveConfig);
@@ -113,28 +118,45 @@ public class FlinkDeploymentController
Configuration effectiveConfig =
FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig());
- boolean readyToReconcile = observer.observe(flinkApp, context,
effectiveConfig);
- if (!readyToReconcile) {
- return flinkApp.getStatus()
- .getJobManagerDeploymentStatus()
- .toUpdateControl(flinkApp, operatorConfiguration);
- }
-
try {
+ boolean readyToReconcile = observer.observe(flinkApp, context,
effectiveConfig);
+ if (!readyToReconcile) {
+ return flinkApp.getStatus()
+ .getJobManagerDeploymentStatus()
+ .toUpdateControl(flinkApp, operatorConfiguration);
+ }
+
UpdateControl<FlinkDeployment> updateControl =
reconcilerFactory
.getOrCreate(flinkApp)
.reconcile(operatorNamespace, flinkApp, context,
effectiveConfig);
return updateControl;
- } catch (InvalidDeploymentException ide) {
- LOG.error("Reconciliation failed", ide);
- ReconciliationUtils.updateForReconciliationError(flinkApp,
ide.getMessage());
+ } catch (DeploymentFailedException dfe) {
+ handleDeploymentFailed(flinkApp, dfe);
return UpdateControl.updateStatus(flinkApp);
} catch (Exception e) {
throw new ReconciliationException(e);
}
}
+ private void handleDeploymentFailed(FlinkDeployment flinkApp,
DeploymentFailedException dfe) {
+ LOG.error(
+ "Deployment {}/{} failed with {}",
+ flinkApp.getMetadata().getNamespace(),
+ flinkApp.getMetadata().getName(),
+ dfe.getMessage());
+
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
+ ReconciliationUtils.updateForReconciliationError(flinkApp,
dfe.getMessage());
+
+ // TODO: avoid repeated event
+ Event event = DeploymentFailedException.asEvent(dfe, flinkApp);
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .create(event);
+ }
+
@Override
public List<EventSource>
prepareEventSources(EventSourceContext<FlinkDeployment> ctx) {
Preconditions.checkNotNull(controllerConfig, "Controller config cannot
be null");
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
new file mode 100644
index 0000000..a16851f
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.exception;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.api.model.EventBuilder;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+
+/** Exception to signal terminal deployment failure. */
+public class DeploymentFailedException extends RuntimeException {
+ public static final String COMPONENT_JOBMANAGER = "JobManagerDeployment";
+
+ public final String component;
+ public final DeploymentCondition deployCondition;
+
+ public DeploymentFailedException(String component, DeploymentCondition
deployCondition) {
+ super(deployCondition.getMessage());
+ this.component = component;
+ this.deployCondition = deployCondition;
+ }
+
+ public static Event asEvent(DeploymentFailedException dfe, FlinkDeployment
flinkApp) {
+ EventBuilder evtb =
+ new EventBuilder()
+ .withApiVersion("v1")
+ .withNewInvolvedObject()
+ .withKind(flinkApp.getKind())
+ .withName(flinkApp.getMetadata().getName())
+ .withNamespace(flinkApp.getMetadata().getNamespace())
+ .withUid(flinkApp.getMetadata().getUid())
+ .endInvolvedObject()
+ .withType(dfe.deployCondition.getType())
+ .withReason(dfe.deployCondition.getReason())
+
.withFirstTimestamp(dfe.deployCondition.getLastTransitionTime())
+
.withLastTimestamp(dfe.deployCondition.getLastUpdateTime())
+ .withMessage(dfe.getMessage())
+ .withNewMetadata()
+ .withGenerateName(flinkApp.getMetadata().getName())
+ .withNamespace(flinkApp.getMetadata().getNamespace())
+ .endMetadata()
+ .withNewSource()
+ .withComponent(dfe.component)
+ .endSource();
+ return evtb.build();
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
deleted file mode 100644
index 9ce408e..0000000
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/InvalidDeploymentException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.exception;
-
-/** Exception for encountering invalid FlinkDeployment resources. */
-public class InvalidDeploymentException extends Exception {
-
- public InvalidDeploymentException(String msg) {
- super(msg);
- }
-
- public InvalidDeploymentException(String msg, Throwable cause) {
- super(msg, cause);
- }
-}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
index 960c1cd..35a395d 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
@@ -37,7 +37,11 @@ public enum JobManagerDeploymentStatus {
DEPLOYING,
/** JobManager deployment not found, probably not started or killed by
user. */
- MISSING;
+ // TODO: currently a mix of SUSPENDED and ERROR, needs cleanup
+ MISSING,
+
+ /** Deployment in terminal error, requires spec change for reconciliation
to continue. */
+ ERROR;
public UpdateControl<FlinkDeployment> toUpdateControl(
FlinkDeployment flinkDeployment, FlinkOperatorConfiguration
operatorConfiguration) {
@@ -54,8 +58,9 @@ public enum JobManagerDeploymentStatus {
operatorConfiguration.getPortCheckIntervalInSec(),
TimeUnit.SECONDS);
case MISSING:
+ case ERROR:
default:
- return null;
+ return UpdateControl.noUpdate();
}
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index 5bafee9..46fb983 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -23,12 +23,14 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -100,6 +102,19 @@ public class Observer {
flinkApp.getMetadata().getNamespace(),
status);
+ List<DeploymentCondition> conditions = status.getConditions();
+ for (DeploymentCondition dc : conditions) {
+ if ("FailedCreate".equals(dc.getReason())
+ && "ReplicaFailure".equals(dc.getType())) {
+ // throw only when not already in error status to allow
for spec update
+ if (!JobManagerDeploymentStatus.ERROR.equals(
+ deploymentStatus.getJobManagerDeploymentStatus()))
{
+ throw new DeploymentFailedException(
+
DeploymentFailedException.COMPONENT_JOBMANAGER, dc);
+ }
+ return;
+ }
+ }
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
return;
}
@@ -181,6 +196,7 @@ public class Observer {
return observeFlinkJobStatus(flinkApp, effectiveConfig)
&& observeSavepointStatus(flinkApp, effectiveConfig);
case MISSING:
+ case ERROR:
return true;
case DEPLOYING:
case DEPLOYED_NOT_READY:
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 009b17a..d76b98a 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -31,6 +31,10 @@ import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
@@ -109,4 +113,62 @@ public class TestUtils {
}
};
}
+
+ public static Context createContextWithReadyJobManagerDeployment() {
+ return new Context() {
+ @Override
+ public Optional<RetryInfo> getRetryInfo() {
+ return Optional.empty();
+ }
+
+ @Override
+ public <T> Optional<T> getSecondaryResource(
+ Class<T> expectedType, String eventSourceName) {
+ DeploymentStatus status = new DeploymentStatus();
+ status.setAvailableReplicas(1);
+ status.setReplicas(1);
+ DeploymentSpec spec = new DeploymentSpec();
+ spec.setReplicas(1);
+ Deployment deployment = new Deployment();
+ deployment.setSpec(spec);
+ deployment.setStatus(status);
+ return Optional.of((T) deployment);
+ }
+ };
+ }
+
+ public static final String DEPLOYMENT_ERROR = "test deployment error
message";
+
+ public static Context createContextWithFailedJobManagerDeployment() {
+ return new Context() {
+ @Override
+ public Optional<RetryInfo> getRetryInfo() {
+ return Optional.empty();
+ }
+
+ @Override
+ public <T> Optional<T> getSecondaryResource(
+ Class<T> expectedType, String eventSourceName) {
+ DeploymentStatus status = new DeploymentStatus();
+ status.setAvailableReplicas(0);
+ status.setReplicas(1);
+ List<DeploymentCondition> conditions =
+ Collections.singletonList(
+ new DeploymentCondition(
+ null,
+ null,
+ DEPLOYMENT_ERROR,
+ "FailedCreate",
+ "status",
+ "ReplicaFailure"));
+ status.setConditions(conditions);
+ DeploymentSpec spec = new DeploymentSpec();
+ spec.setReplicas(1);
+ Deployment deployment = new Deployment();
+ deployment.setSpec(spec);
+ deployment.setStatus(status);
+ return Optional.of((T) deployment);
+ }
+ };
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 55d1628..04e9925 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -29,13 +29,13 @@ import
org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.observer.Observer;
-import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
import org.apache.flink.runtime.client.JobStatusMessage;
+import io.fabric8.kubernetes.api.model.EventBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -43,13 +43,16 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -62,7 +65,7 @@ import static org.junit.Assert.assertTrue;
/** @link JobStatusObserver unit tests */
public class FlinkDeploymentControllerTest {
- private final Context context =
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+ private final Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
private final FlinkOperatorConfiguration operatorConfiguration =
FlinkOperatorConfiguration.fromConfiguration(new Configuration());
@@ -152,6 +155,55 @@ public class FlinkDeploymentControllerTest {
}
@Test
+ public void verifyFailedDeployment() throws Exception {
+
+ mockServer
+ .expect()
+ .post()
+ .withPath("/api/v1/namespaces/flink-operator-test/events")
+ .andReturn(
+ HttpURLConnection.HTTP_CREATED,
+ new
EventBuilder().withNewMetadata().endMetadata().build())
+ .once();
+
+ FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+ UpdateControl<FlinkDeployment> updateControl;
+
+ updateControl =
+ testController.reconcile(
+ appCluster,
TestUtils.createContextWithFailedJobManagerDeployment());
+ assertTrue(updateControl.isUpdateStatus());
+ assertEquals(Optional.empty(), updateControl.getScheduleDelay());
+
+ RecordedRequest recordedRequest = mockServer.getLastRequest();
+ assertEquals("POST", recordedRequest.getMethod());
+
assertTrue(recordedRequest.getBody().readUtf8().contains(TestUtils.DEPLOYMENT_ERROR));
+ assertEquals(
+ JobManagerDeploymentStatus.ERROR,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+
+ // Validate reconciliation status
+ ReconciliationStatus reconciliationStatus =
+ appCluster.getStatus().getReconciliationStatus();
+ assertFalse(reconciliationStatus.isSuccess());
+ assertNotNull(reconciliationStatus.getError());
+
+ // next cycle should not create another event
+ updateControl =
+ testController.reconcile(
+ appCluster,
TestUtils.createContextWithFailedJobManagerDeployment());
+ assertEquals(
+ JobManagerDeploymentStatus.ERROR,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertTrue(updateControl.isUpdateStatus());
+ assertEquals(
+ JobManagerDeploymentStatus.READY
+ .toUpdateControl(appCluster, operatorConfiguration)
+ .getScheduleDelay(),
+ updateControl.getScheduleDelay());
+ }
+
+ @Test
public void verifyUpgradeFromSavepoint() {
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
index c5d73d4..0b4289f 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
@@ -24,7 +24,6 @@ import
org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -40,10 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link Observer unit tests */
public class ObserverTest {
- private final Context readyContext =
- JobReconcilerTest.createContextWithReadyJobManagerDeployment();
- private final FlinkOperatorConfiguration operatorConfiguration =
- FlinkOperatorConfiguration.fromConfiguration(new Configuration());
+ private final Context readyContext =
TestUtils.createContextWithReadyJobManagerDeployment();
@Test
public void observeSessionCluster() {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index cc7f508..c712501 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -30,15 +30,10 @@ import
org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
-import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import org.junit.jupiter.api.Test;
import java.util.List;
-import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -50,32 +45,9 @@ public class JobReconcilerTest {
private final FlinkOperatorConfiguration operatorConfiguration =
FlinkOperatorConfiguration.fromConfiguration(new Configuration());
- public static Context createContextWithReadyJobManagerDeployment() {
- return new Context() {
- @Override
- public Optional<RetryInfo> getRetryInfo() {
- return Optional.empty();
- }
-
- @Override
- public <T> Optional<T> getSecondaryResource(
- Class<T> expectedType, String eventSourceName) {
- DeploymentStatus status = new DeploymentStatus();
- status.setAvailableReplicas(1);
- status.setReplicas(1);
- DeploymentSpec spec = new DeploymentSpec();
- spec.setReplicas(1);
- Deployment deployment = new Deployment();
- deployment.setSpec(spec);
- deployment.setStatus(status);
- return Optional.of((T) deployment);
- }
- };
- }
-
@Test
public void testUpgrade() throws Exception {
- Context context =
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+ Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
TestingFlinkService flinkService = new TestingFlinkService();
JobReconciler reconciler = new JobReconciler(null, flinkService,
operatorConfiguration);
@@ -116,7 +88,7 @@ public class JobReconcilerTest {
@Test
public void testUpgradeModeChangeFromSavepointToLastState() throws
Exception {
final String expectedSavepointPath = "savepoint_0";
- final Context context =
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+ final Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
final TestingFlinkService flinkService = new TestingFlinkService();
final JobReconciler reconciler =
@@ -168,7 +140,7 @@ public class JobReconcilerTest {
@Test
public void triggerSavepoint() throws Exception {
- Context context =
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+ Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
TestingFlinkService flinkService = new TestingFlinkService();
JobReconciler reconciler = new JobReconciler(null, flinkService,
operatorConfiguration);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 078c0ed..4ab435f 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9097,6 +9097,7 @@ spec:
- DEPLOYED_NOT_READY
- DEPLOYING
- MISSING
+ - ERROR
type: string
reconciliationStatus:
properties: