This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new e80dead [SPARK-53706] App reconcile steps should properly handle
exceptions in status update
e80dead is described below
commit e80deada70ba5357d7f5c55c5e8583c3941dc3a5
Author: Zhou JIANG <[email protected]>
AuthorDate: Thu Sep 25 14:47:15 2025 -0700
[SPARK-53706] App reconcile steps should properly handle exceptions in
status update
### What changes were proposed in this pull request?
This PR adds exception handling for status recorder when persisting status.
If encountered exceptions in updating app status, reconciler would finish
current reconcile loop and requeue a new one instead of blindly throw the
exception.
### Why are the changes needed?
SparkAppReconciler is not handling exceptions when updating app status -
these failed reconcile loops may end up with endless retry if the status update
is caused by conflicts.
For example, we observe exceptions like these when app is starting
```
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure
executing: PUT at:
https://kind-control-plane.vsl:6443/apis/spark.apache.org/v1/namespaces/default/sparkapplications/spark-example-retain-duration/status.
Message: Operation cannot be fulfilled on sparkapplications.spark.apache.org
"spark-example-retain-duration": the object has been modified; please apply
your changes to the latest version and try again. Received status:
Status(apiVersion=v1, code=409, details [...]
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:642)
~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?]
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:622)
~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?]
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:582)
~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?]
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:549)
~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?]
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
~[?:?]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
~[?:?]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
~[?:?]
at
io.fabric8.kubernetes.client.http.StandardHttpClient.lambda$completeOrCancel$10(StandardHttpClient.java:141)
~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?]
```
Why is this happening ? Because reconcile can be triggered again by driver
pod status update while another reconcile is in-progress. Without proper
exception handling, this would keep recurring, taking unnecessary CPU time and
confusing user in logs.
Another corner case that would be fixed by this patch: current
`AppInitStep` would mark app as `SchedulingFailure` even if the resources are
requested as expected and only the status update fails. This patch handles
exceptions for resource creation and status update separately.
We'd better digest this better: if an exception is thrown while updating
app status (which is typically at the last of each reconcile) - operator shall
properly finish this reconcile loop and start a new one. App status is fetched
from cache at the beginning of each reconcile - and our reconcile steps are
ready designed to be idempotent.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Tested from two perspectives:
1. CIs - also added new unit test to validate update idempotency in init
step
2. Check logs : with rthis patch, no more exceptions like above is thrown
while running apps
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #341 from jiangzho/status_patch.
Authored-by: Zhou JIANG <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../reconciler/reconcilesteps/AppInitStep.java | 25 +++++------
.../reconcilesteps/AppReconcileStep.java | 51 ++++++++++++++++++----
.../reconciler/reconcilesteps/AppRunningStep.java | 9 ++--
.../reconcilesteps/AppUnknownStateStep.java | 7 ++-
.../reconciler/reconcilesteps/AppValidateStep.java | 10 +++--
.../k8s/operator/utils/SparkAppStatusRecorder.java | 10 +++--
.../spark/k8s/operator/utils/StatusRecorder.java | 13 ++++--
.../reconcilesteps/AppCleanUpStepTest.java | 7 +++
.../reconciler/reconcilesteps/AppInitStepTest.java | 47 +++++++++++++++++++-
9 files changed, 140 insertions(+), 39 deletions(-)
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
index f8cbfce..cc5c36a 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
@@ -103,30 +103,29 @@ public class AppInitStep extends AppReconcileStep {
}
}
}
- ApplicationStatus updatedStatus =
- context
- .getResource()
- .getStatus()
- .appendNewState(
- new ApplicationState(
- ApplicationStateSummary.DriverRequested,
Constants.DRIVER_REQUESTED_MESSAGE));
- statusRecorder.persistStatus(context, updatedStatus);
- return completeAndDefaultRequeue();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Failed to request driver resource.", e);
}
String errorMessage =
Constants.SCHEDULE_FAILURE_MESSAGE + " StackTrace: " +
buildGeneralErrorMessage(e);
- statusRecorder.persistStatus(
- context,
+ ApplicationStatus updatedStatus =
context
.getResource()
.getStatus()
.appendNewState(
- new
ApplicationState(ApplicationStateSummary.SchedulingFailure, errorMessage)));
- return completeAndImmediateRequeue();
+ new
ApplicationState(ApplicationStateSummary.SchedulingFailure, errorMessage));
+ return attemptStatusUpdate(
+ context, statusRecorder, updatedStatus,
completeAndImmediateRequeue());
}
+ ApplicationStatus updatedStatus =
+ context
+ .getResource()
+ .getStatus()
+ .appendNewState(
+ new ApplicationState(
+ ApplicationStateSummary.DriverRequested,
Constants.DRIVER_REQUESTED_MESSAGE));
+ return attemptStatusUpdate(context, statusRecorder, updatedStatus,
completeAndDefaultRequeue());
}
/**
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java
index 3502708..ba56176 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Optional;
import io.fabric8.kubernetes.api.model.Pod;
+import lombok.extern.log4j.Log4j2;
import org.apache.spark.k8s.operator.SparkApplication;
import org.apache.spark.k8s.operator.context.SparkAppContext;
@@ -38,6 +39,7 @@ import org.apache.spark.k8s.operator.status.ApplicationStatus;
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
/** Basic reconcile step for application. */
+@Log4j2
public abstract class AppReconcileStep {
/**
* Reconciles a specific step for a Spark application.
@@ -77,18 +79,45 @@ public abstract class AppReconcileStep {
for (ApplicationState state : stateUpdates) {
currentStatus = currentStatus.appendNewState(state);
}
- statusRecorder.persistStatus(context, currentStatus);
- return completeAndImmediateRequeue();
+ return attemptStatusUpdate(
+ context, statusRecorder, currentStatus,
completeAndImmediateRequeue());
}
} else {
ApplicationStatus updatedStatus =
currentStatus.appendNewState(driverUnexpectedRemoved());
- statusRecorder.persistStatus(context, updatedStatus);
+ return attemptStatusUpdate(
+ context, statusRecorder, updatedStatus,
completeAndImmediateRequeue());
+ }
+ }
+
+ /**
+ * Updates the application status - if the status is successfully persisted,
proceed with the
+ * given progress. Otherwise, completes current reconcile loop immediately
and requeue. Latest
+ * application status would be fetched from cache in next reconcile attempt.
+ *
+ * @param context The SparkAppContext for the application.
+ * @param statusRecorder The SparkAppStatusRecorder for recording status
updates.
+ * @param updatedStatus The updated ApplicationStatus.
+ * @param progressUponSuccessStatusUpdate The ReconcileProgress if the
status update has been
+ * persisted successfully.
+ * @return The ReconcileProgress for next steps.
+ */
+ protected ReconcileProgress attemptStatusUpdate(
+ final SparkAppContext context,
+ final SparkAppStatusRecorder statusRecorder,
+ final ApplicationStatus updatedStatus,
+ final ReconcileProgress progressUponSuccessStatusUpdate) {
+
+ if (statusRecorder.persistStatus(context, updatedStatus)) {
+ return progressUponSuccessStatusUpdate;
+ } else {
+ log.warn("Failed to persist status, will retry status update in next
reconcile attempt");
return completeAndImmediateRequeue();
}
}
/**
- * Updates the application status and re-queues the reconciliation after a
specified duration.
+ * Updates the application status and re-queues the reconciliation after a
specified duration. If
+ * the status update fails, trigger an immediate requeue.
*
* @param context The SparkAppContext for the application.
* @param statusRecorder The SparkAppStatusRecorder for recording status
updates.
@@ -101,13 +130,16 @@ public abstract class AppReconcileStep {
SparkAppStatusRecorder statusRecorder,
ApplicationStatus updatedStatus,
Duration requeueAfter) {
- statusRecorder.persistStatus(context, updatedStatus);
- return ReconcileProgress.completeAndRequeueAfter(requeueAfter);
+ return attemptStatusUpdate(
+ context,
+ statusRecorder,
+ updatedStatus,
+ ReconcileProgress.completeAndRequeueAfter(requeueAfter));
}
/**
* Appends a new state to the application status, persists it, and re-queues
the reconciliation
- * after a specified duration.
+ * after a specified duration. If the status update fails, trigger an
immediate requeue.
*
* @param context The SparkAppContext for the application.
* @param statusRecorder The SparkAppStatusRecorder for recording status
updates.
@@ -120,7 +152,10 @@ public abstract class AppReconcileStep {
SparkAppStatusRecorder statusRecorder,
ApplicationState newState,
Duration requeueAfter) {
- statusRecorder.appendNewStateAndPersist(context, newState);
+ if (!statusRecorder.appendNewStateAndPersist(context, newState)) {
+ log.warn("Status is not persisted successfully, will retry in next
reconcile attempt");
+ return completeAndImmediateRequeue();
+ }
return ReconcileProgress.completeAndRequeueAfter(requeueAfter);
}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
index 55d3544..897c46e 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
@@ -33,6 +33,7 @@ import
org.apache.spark.k8s.operator.reconciler.observers.AppDriverRunningObserv
import org.apache.spark.k8s.operator.spec.ExecutorInstanceConfig;
import org.apache.spark.k8s.operator.status.ApplicationState;
import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
+import org.apache.spark.k8s.operator.status.ApplicationStatus;
import org.apache.spark.k8s.operator.utils.PodUtils;
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
@@ -87,13 +88,13 @@ public class AppRunningStep extends AppReconcileStep {
return observeDriver(
context, statusRecorder, Collections.singletonList(new
AppDriverRunningObserver()));
} else {
- statusRecorder.persistStatus(
- context,
+ ApplicationStatus updatedStatus =
context
.getResource()
.getStatus()
- .appendNewState(new ApplicationState(proposedStateSummary,
stateMessage)));
- return completeAndDefaultRequeue();
+ .appendNewState(new ApplicationState(proposedStateSummary,
stateMessage));
+ return attemptStatusUpdate(
+ context, statusRecorder, updatedStatus, completeAndDefaultRequeue());
}
}
}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java
index 09cf195..d103efd 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java
@@ -46,7 +46,10 @@ public class AppUnknownStateStep extends AppReconcileStep {
new ApplicationState(ApplicationStateSummary.Failed,
Constants.UNKNOWN_STATE_MESSAGE);
Optional<Pod> driver = context.getDriverPod();
driver.ifPresent(pod ->
state.setLastObservedDriverStatus(pod.getStatus()));
- statusRecorder.persistStatus(context,
context.getResource().getStatus().appendNewState(state));
- return ReconcileProgress.completeAndImmediateRequeue();
+ return attemptStatusUpdate(
+ context,
+ statusRecorder,
+ context.getResource().getStatus().appendNewState(state),
+ ReconcileProgress.completeAndImmediateRequeue());
}
}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java
index f3f5e17..174328d 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java
@@ -48,14 +48,16 @@ public class AppValidateStep extends AppReconcileStep {
SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
if (!isValidApplicationStatus(context.getResource())) {
log.warn("Spark application found with empty status. Resetting to
initial state.");
- statusRecorder.persistStatus(context, new ApplicationStatus());
+ return attemptStatusUpdate(context, statusRecorder, new
ApplicationStatus(), proceed());
}
if (ClientMode.equals(context.getResource().getSpec())) {
ApplicationState failure =
new ApplicationState(ApplicationStateSummary.Failed, "Client mode is
not supported yet.");
- statusRecorder.persistStatus(
- context, context.getResource().getStatus().appendNewState(failure));
- return completeAndImmediateRequeue();
+ return attemptStatusUpdate(
+ context,
+ statusRecorder,
+ context.getResource().getStatus().appendNewState(failure),
+ completeAndImmediateRequeue());
}
return proceed();
}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
index 2123aaf..16ddfe1 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
@@ -51,11 +51,15 @@ public class SparkAppStatusRecorder
*
* @param context The SparkAppContext for the application.
* @param newState The new ApplicationState to append.
+ * @return true if the status is successfully patched
*/
- public void appendNewStateAndPersist(SparkAppContext context,
ApplicationState newState) {
+ public boolean appendNewStateAndPersist(SparkAppContext context,
ApplicationState newState) {
ApplicationStatus appStatus = context.getResource().getStatus();
- recorderSource.recordStatusUpdateLatency(appStatus, newState);
ApplicationStatus updatedStatus = appStatus.appendNewState(newState);
- persistStatus(context, updatedStatus);
+ boolean statusPersisted = persistStatus(context, updatedStatus);
+ if (statusPersisted) {
+ recorderSource.recordStatusUpdateLatency(appStatus, newState);
+ }
+ return statusPersisted;
}
}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java
index f3f8f75..b41cf99 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java
@@ -127,10 +127,17 @@ public class StatusRecorder<
*
* @param context The BaseContext containing the resource and client.
* @param newStatus The new status to persist.
+ * @return true if the status is successfully patched.
*/
- public void persistStatus(BaseContext<CR> context, STATUS newStatus) {
- context.getResource().setStatus(newStatus);
- patchAndStatusWithVersionLocked(context.getResource(),
context.getClient());
+ public boolean persistStatus(BaseContext<CR> context, STATUS newStatus) {
+ try {
+ context.getResource().setStatus(newStatus);
+ patchAndStatusWithVersionLocked(context.getResource(),
context.getClient());
+ return true;
+ } catch (KubernetesClientException e) {
+ log.error("Error while persisting status to {}", newStatus, e);
+ return false;
+ }
}
/**
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
index 1fbf563..1ff19fd 100644
---
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
@@ -21,6 +21,7 @@ package
org.apache.spark.k8s.operator.reconciler.reconcilesteps;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -171,6 +172,8 @@ class AppCleanUpStepTest {
when(mockAppContext.getDriverPod()).thenReturn(Optional.of(driverPod));
when(mockAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
when(mockAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
+ when(mockRecorder.persistStatus(eq(mockAppContext),
any())).thenReturn(true);
+ when(mockRecorder.appendNewStateAndPersist(eq(mockAppContext),
any())).thenReturn(true);
try (MockedStatic<ReconcilerUtils> utils =
Mockito.mockStatic(ReconcilerUtils.class)) {
ReconcileProgress progress =
cleanUpWithReason.reconcile(mockAppContext, mockRecorder);
@@ -259,6 +262,8 @@ class AppCleanUpStepTest {
when(mockAppContext.getDriverPod()).thenReturn(Optional.of(driverPod));
when(mockAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
when(mockAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
+ when(mockRecorder.persistStatus(eq(mockAppContext),
any())).thenReturn(true);
+ when(mockRecorder.appendNewStateAndPersist(eq(mockAppContext),
any())).thenReturn(true);
try (MockedStatic<ReconcilerUtils> utils =
Mockito.mockStatic(ReconcilerUtils.class)) {
ReconcileProgress progress = cleanUpWithReason.reconcile(mockAppContext,
mockRecorder);
@@ -317,6 +322,8 @@ class AppCleanUpStepTest {
when(mockAppContext2.getDriverPreResourcesSpec())
.thenReturn(Collections.singletonList(resource1));
when(mockAppContext2.getDriverResourcesSpec()).thenReturn(Collections.singletonList(resource2));
+ when(mockRecorder.persistStatus(any(), any())).thenReturn(true);
+ when(mockRecorder.appendNewStateAndPersist(any(), any())).thenReturn(true);
try (MockedStatic<ReconcilerUtils> utils =
Mockito.mockStatic(ReconcilerUtils.class)) {
ReconcileProgress progress1 =
cleanUpWithReason.reconcile(mockAppContext1, mockRecorder);
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStepTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStepTest.java
index 6495b71..0131025 100644
---
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStepTest.java
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStepTest.java
@@ -19,6 +19,7 @@
package org.apache.spark.k8s.operator.reconciler.reconcilesteps;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -49,6 +50,7 @@ import org.mockito.ArgumentCaptor;
import org.apache.spark.k8s.operator.SparkApplication;
import org.apache.spark.k8s.operator.context.SparkAppContext;
+import org.apache.spark.k8s.operator.reconciler.ReconcileProgress;
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
@EnableKubernetesMockClient(crud = true)
@@ -106,7 +108,10 @@ class AppInitStepTest {
when(mocksparkAppContext.getDriverResourcesSpec())
.thenReturn(Collections.singletonList(resourceConfigMapSpec));
when(mocksparkAppContext.getClient()).thenReturn(kubernetesClient);
- appInitStep.reconcile(mocksparkAppContext, recorder);
+ when(recorder.appendNewStateAndPersist(any(), any())).thenReturn(true);
+ when(recorder.persistStatus(any(), any())).thenReturn(true);
+ ReconcileProgress reconcileProgress =
appInitStep.reconcile(mocksparkAppContext, recorder);
+ Assertions.assertEquals(ReconcileProgress.completeAndDefaultRequeue(),
reconcileProgress);
Pod createdPod =
kubernetesClient.pods().inNamespace("default").withName("driver-pod").get();
ConfigMap createCM =
kubernetesClient.configMaps().inNamespace("default").withName("resource-configmap").get();
@@ -135,6 +140,8 @@ class AppInitStepTest {
.thenReturn(Collections.singletonList(preResourceConfigMapSpec));
when(mocksparkAppContext.getDriverPodSpec()).thenReturn(driverPodSpec);
when(mocksparkAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
+ when(recorder.appendNewStateAndPersist(any(), any())).thenReturn(true);
+ when(recorder.persistStatus(any(), any())).thenReturn(true);
KubernetesClient mockClient = mock(KubernetesClient.class);
when(mocksparkAppContext.getClient()).thenReturn(mockClient);
@@ -166,8 +173,9 @@ class AppInitStepTest {
when(mockClient.resourceList(anyList())).thenReturn(mockList);
when(mockList.forceConflicts()).thenReturn(mockServerSideApplicable);
- appInitStep.reconcile(mocksparkAppContext, recorder);
+ ReconcileProgress reconcileProgress =
appInitStep.reconcile(mocksparkAppContext, recorder);
+ Assertions.assertEquals(ReconcileProgress.completeAndDefaultRequeue(),
reconcileProgress);
ArgumentCaptor<List<ConfigMap>> argument =
ArgumentCaptor.forClass(List.class);
verify(mockClient).resourceList(argument.capture());
Assertions.assertEquals(1, argument.getValue().size());
@@ -184,4 +192,39 @@ class AppInitStepTest {
decoratedConfigMap.getMetadata().getOwnerReferences().get(0).getKind());
Assertions.assertTrue(decoratedConfigMap.getMetadata().getManagedFields().isEmpty());
}
+
+ @Test
+ void appInitStepShouldBeIdempotentWhenStatusUpdateFails() {
+ AppInitStep appInitStep = new AppInitStep();
+ SparkAppContext mocksparkAppContext = mock(SparkAppContext.class);
+ SparkAppStatusRecorder recorder = mock(SparkAppStatusRecorder.class);
+ SparkApplication application = new SparkApplication();
+ application.setMetadata(applicationMetadata);
+ when(mocksparkAppContext.getResource()).thenReturn(application);
+
when(mocksparkAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
+ when(mocksparkAppContext.getDriverPodSpec()).thenReturn(driverPodSpec);
+ when(mocksparkAppContext.getDriverResourcesSpec())
+ .thenReturn(Collections.singletonList(resourceConfigMapSpec));
+ when(mocksparkAppContext.getClient()).thenReturn(kubernetesClient);
+ when(recorder.appendNewStateAndPersist(any(), any())).thenReturn(false,
true);
+ when(recorder.persistStatus(any(), any())).thenReturn(false, true);
+
+ // If the first reconcile manages to create everything but fails to update
status
+ ReconcileProgress reconcileProgress1 =
appInitStep.reconcile(mocksparkAppContext, recorder);
+ Assertions.assertEquals(ReconcileProgress.completeAndImmediateRequeue(),
reconcileProgress1);
+ Pod createdPod =
kubernetesClient.pods().inNamespace("default").withName("driver-pod").get();
+ ConfigMap createCM =
+
kubernetesClient.configMaps().inNamespace("default").withName("resource-configmap").get();
+ Assertions.assertNotNull(createCM);
+ Assertions.assertNotNull(createdPod);
+
+ // The second reconcile shall update the status without re-creating
everything
+ ReconcileProgress reconcileProgress2 =
appInitStep.reconcile(mocksparkAppContext, recorder);
+ Assertions.assertEquals(ReconcileProgress.completeAndDefaultRequeue(),
reconcileProgress2);
+ createdPod =
kubernetesClient.pods().inNamespace("default").withName("driver-pod").get();
+ createCM =
+
kubernetesClient.configMaps().inNamespace("default").withName("resource-configmap").get();
+ Assertions.assertNotNull(createCM);
+ Assertions.assertNotNull(createdPod);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]