This is an automated email from the ASF dual-hosted git repository.
ptoth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new b89c5cc [SPARK-52915] Support TTL for Spark apps
b89c5cc is described below
commit b89c5cc5dd880fdb65a93a3f14817ce9141227a6
Author: Zhou JIANG <[email protected]>
AuthorDate: Thu Aug 14 10:41:18 2025 +0200
[SPARK-52915] Support TTL for Spark apps
### What changes were proposed in this pull request?
This PR adds support for configuring the ttl for Spark apps after it stops.
Working with the `resourceRetainPolicy` and `resourceRetainDurationMillis`, it
enhances the garbage collection mechanism at the custom resource level.
### Why are the changes needed?
Introducing TTL helps user to more effectively configure the garbage
collection for apps.
### Does this PR introduce _any_ user-facing change?
New configurable field spec.applicationTolerations.ttlAfterStopMillis added
to SparkApplication CRD
### How was this patch tested?
CIs - including new unit test and revised e2e scenario
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #290 from jiangzho/resource_ttl.
Authored-by: Zhou JIANG <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
---
.../sparkapplications.spark.apache.org-v1.yaml | 3 +
docs/spark_custom_resources.md | 51 +++++-
.../k8s/operator/spec/ApplicationTolerations.java | 48 +++++-
.../operator/spec/ApplicationTolerationsTest.java | 70 ++++++++
.../reconciler/reconcilesteps/AppCleanUpStep.java | 73 +++++++--
.../reconcilesteps/AppCleanUpStepTest.java | 180 +++++++++++++++++----
.../resource-retain-duration/chainsaw-test.yaml | 10 +-
.../spark-example-retain-duration.yaml | 1 +
8 files changed, 382 insertions(+), 54 deletions(-)
diff --git
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
index 21d3996..6b2ee7b 100644
---
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
+++
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
@@ -17434,6 +17434,9 @@ spec:
- OnInfrastructureFailure
type: string
type: object
+ ttlAfterStopMillis:
+ default: -1
+ type: integer
type: object
configMapSpecs:
items:
diff --git a/docs/spark_custom_resources.md b/docs/spark_custom_resources.md
index ad6f8b9..1f39ba5 100644
--- a/docs/spark_custom_resources.md
+++ b/docs/spark_custom_resources.md
@@ -293,6 +293,8 @@ applicationTolerations:
resourceRetainPolicy: OnFailure
# Secondary resources would be garbage collected 10 minutes after app
termination
resourceRetainDurationMillis: 600000
+ # Garbage collect the SparkApplication custom resource itself 30 minutes
after termination
+ ttlAfterStopMillis: 1800000
```
to avoid operator attempt to delete driver pod and driver resources if app
fails. Similarly,
@@ -302,7 +304,54 @@ possible to configure `resourceRetainDurationMillis` to
define the maximal retai
these resources. Note that this applies only to operator-created resources
(driver pod, SparkConf
configmap .etc). You may also want to tune
`spark.kubernetes.driver.service.deleteOnTermination`
and `spark.kubernetes.executor.deleteOnTermination` to control the behavior of
driver-created
-resources.
+resources. `ttlAfterStopMillis` controls the garbage collection behavior at
the SparkApplication
+level after it stops. When set to a non-negative value, Spark operator would
garbage collect the
+application (and therefore all its associated resources) after given timeout.
If the application
+is configured to restart, `resourceRetainPolicy`,
`resourceRetainDurationMillis` and
+`ttlAfterStopMillis` would be applied only to the last attempt.
+
+For example, if an app with below configuration:
+
+```yaml
+applicationTolerations:
+ restartConfig:
+ restartPolicy: OnFailure
+ maxRestartAttempts: 1
+ resourceRetainPolicy: Always
+ resourceRetainDurationMillis: 30000
+ ttlAfterStopMillis: 60000
+```
+
+ends up with status like:
+
+```yaml
+status:
+#... the 1st attempt
+ "5":
+ currentStateSummary: Failed
+ "6":
+ currentStateSummary: ScheduledToRestart
+# ...the 2nd attempt
+ "11":
+ currentStateSummary: Succeeded
+ "12":
+ currentStateSummary: TerminatedWithoutReleaseResources
+```
+
+The retain policy only takes effect after the final state `12`. Secondary
resources are always
+released between attempts between `5` and `6`. TTL would be calculated based
on the last state as
+well.
+
+| Field | Type
| Default Value | Description
|
+|-----------------------------------------------------------|-----------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| .spec.applicationTolerations.resourceRetainPolicy | `Always` /
`OnFailure` / `Never` | Never | Configure operator to delete / retain
secondary resources for an app after it terminates.
|
+| .spec.applicationTolerations.resourceRetainDurationMillis | integer
| -1 | Time to wait in milliseconds for releasing
**secondary resources** after termination. Setting to negative value would
disable the retention duration check for secondary resources after termination.
|
+| .spec.applicationTolerations.ttlAfterStopMillis | integer
| -1 | Time-to-live in milliseconds for
SparkApplication and **all its associated secondary resources**. If set to a
negative value, the application would be retained and not be garbage collected
by operator. |
+
+Note that `ttlAfterStopMillis` applies to the app as well as its secondary
resources. If both
+`resourceRetainDurationMillis` and `ttlAfterStopMillis` are set to
non-negative value and the
+latter is smaller, then it takes higher precedence: operator would remove all
resources related
+to this app after `ttlAfterStopMillis`.
## Spark Cluster
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
index 8e47817..20b8122 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
@@ -58,6 +58,34 @@ public class ApplicationTolerations {
@Builder.Default
protected Long resourceRetainDurationMillis = -1L;
+ /**
+ * Time-to-live in milliseconds for SparkApplication and all its associated
secondary resources
+ * after stop. If set to a negative value, the application could be retained
according to the
+ * retain policy. If the application is configured to restart, this would
apply to the last
+ * attempt only.
+ */
+ @Default("-1")
+ @Builder.Default
+ protected Long ttlAfterStopMillis = -1L;
+
+ /**
+ * @return The effective retain duration for secondary resources, which
would be the smaller value
+ * of `resourceRetainDurationMillis` or `ttlAfterStopMillis`, if they
are set to non-negative
+ * value. Return -1 if none of them are set.
+ */
+ public long computeEffectiveRetainDurationMillis() {
+ if (resourceRetainDurationMillis < 0 && ttlAfterStopMillis < 0) {
+ return -1L;
+ }
+ if (resourceRetainDurationMillis < 0) {
+ return ttlAfterStopMillis;
+ }
+ if (ttlAfterStopMillis < 0) {
+ return resourceRetainDurationMillis;
+ }
+ return Math.min(resourceRetainDurationMillis, ttlAfterStopMillis);
+ }
+
/**
* Check whether a terminated application has exceeded the resource retain
duration at the
* provided instant
@@ -68,20 +96,30 @@ public class ApplicationTolerations {
*/
public boolean exceedRetainDurationAtInstant(
ApplicationState lastObservedState, Instant instant) {
- return lastObservedState != null
+ return isRetainDurationEnabled()
+ && lastObservedState != null
&& lastObservedState.getCurrentStateSummary().isTerminated()
- && resourceRetainDurationMillis > 0L
&& Instant.parse(lastObservedState.getLastTransitionTime())
- .plusMillis(resourceRetainDurationMillis)
+ .plusMillis(computeEffectiveRetainDurationMillis())
.isBefore(instant);
}
/**
* Indicates whether the reconciler need to perform retain duration check
*
- * @return true `resourceRetainDurationMillis` is set to non-negative value
+ * @return true if `resourceRetainDurationMillis` or `ttlAfterStopMillis` is
set to non-negative
+ * value
*/
public boolean isRetainDurationEnabled() {
- return resourceRetainDurationMillis >= 0L;
+ return resourceRetainDurationMillis >= 0L || ttlAfterStopMillis >= 0L;
+ }
+
+ /**
+ * Indicates whether the reconciler need to perform ttl check
+ *
+ * @return true if `ttlAfterStopMillis` is set to non-negative value
+ */
+ public boolean isTTLEnabled() {
+ return ttlAfterStopMillis >= 0L;
}
}
diff --git
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ApplicationTolerationsTest.java
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ApplicationTolerationsTest.java
new file mode 100644
index 0000000..bcc537f
--- /dev/null
+++
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ApplicationTolerationsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.spec;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+class ApplicationTolerationsTest {
+ private final ApplicationTolerations withRetainDurationOnly =
+
ApplicationTolerations.builder().resourceRetainDurationMillis(10L).build();
+ private final ApplicationTolerations withTTLOnly =
+ ApplicationTolerations.builder().ttlAfterStopMillis(10L).build();
+ private final ApplicationTolerations withNeitherRetainDurationNorTtl =
+ ApplicationTolerations.builder().build();
+ private final ApplicationTolerations withRetainDurationGreaterThanTtl =
+ ApplicationTolerations.builder()
+ .resourceRetainDurationMillis(20L)
+ .ttlAfterStopMillis(10L)
+ .build();
+ private final ApplicationTolerations withRetainDurationShorterThanTtl =
+ ApplicationTolerations.builder()
+ .resourceRetainDurationMillis(10L)
+ .ttlAfterStopMillis(20L)
+ .build();
+
+ @Test
+ void computeEffectiveRetainDurationMillis() {
+ assertEquals(10L,
withRetainDurationOnly.computeEffectiveRetainDurationMillis());
+ assertEquals(10L, withTTLOnly.computeEffectiveRetainDurationMillis());
+ assertEquals(-1,
withNeitherRetainDurationNorTtl.computeEffectiveRetainDurationMillis());
+ assertEquals(10L,
withRetainDurationGreaterThanTtl.computeEffectiveRetainDurationMillis());
+ assertEquals(10L,
withRetainDurationShorterThanTtl.computeEffectiveRetainDurationMillis());
+ }
+
+ @Test
+ void isRetainDurationEnabled() {
+ assertTrue(withRetainDurationOnly.isRetainDurationEnabled());
+ assertTrue(withTTLOnly.isRetainDurationEnabled());
+ assertFalse(withNeitherRetainDurationNorTtl.isRetainDurationEnabled());
+ assertTrue(withRetainDurationGreaterThanTtl.isRetainDurationEnabled());
+ assertTrue(withRetainDurationShorterThanTtl.isRetainDurationEnabled());
+ }
+
+ @Test
+ void isTTLEnabled() {
+ assertFalse(withRetainDurationOnly.isTTLEnabled());
+ assertTrue(withTTLOnly.isTTLEnabled());
+ assertFalse(withNeitherRetainDurationNorTtl.isTTLEnabled());
+ assertTrue(withRetainDurationGreaterThanTtl.isTTLEnabled());
+ assertTrue(withRetainDurationShorterThanTtl.isTTLEnabled());
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
index c62a506..12b241f 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
@@ -30,6 +30,7 @@ import java.util.function.Supplier;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -95,7 +96,7 @@ public class AppCleanUpStep extends AppReconcileStep {
ApplicationTolerations tolerations =
application.getSpec().getApplicationTolerations();
if (currentState.getCurrentStateSummary().isTerminated()) {
Optional<ReconcileProgress> terminatedAppProgress =
- checkEarlyExitForTerminatedApp(application, statusRecorder);
+ checkEarlyExitForTerminatedApp(context.getClient(), application,
statusRecorder);
if (terminatedAppProgress.isPresent()) {
return terminatedAppProgress.get();
}
@@ -184,14 +185,45 @@ public class AppCleanUpStep extends AppReconcileStep {
}
}
- protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp(
+ protected Optional<ReconcileProgress>
clearCacheAndFinishReconcileForApplication(
final SparkApplication application, final SparkAppStatusRecorder
statusRecorder) {
+ log.debug("Cleaning up status cache and stop reconciling for
application.");
+ statusRecorder.removeCachedStatus(application);
+ return Optional.of(ReconcileProgress.completeAndNoRequeue());
+ }
+
+ protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp(
+ final KubernetesClient client,
+ final SparkApplication application,
+ final SparkAppStatusRecorder statusRecorder) {
ApplicationStatus currentStatus = application.getStatus();
ApplicationState currentState = currentStatus.getCurrentState();
ApplicationTolerations tolerations =
application.getSpec().getApplicationTolerations();
+ Instant now = Instant.now();
if
(ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary()))
{
- statusRecorder.removeCachedStatus(application);
- return Optional.of(ReconcileProgress.completeAndNoRequeue());
+ // Perform TTL check after removing all secondary resources, if enabled
+ if (isOnDemandCleanup() || !tolerations.isTTLEnabled()) {
+ // all secondary resources have been released, no more reconciliations
needed
+ return clearCacheAndFinishReconcileForApplication(application,
statusRecorder);
+ } else {
+ ApplicationState lastObservedStateBeforeTermination =
+ getLastObservedStateBeforeTermination(currentStatus);
+ Duration nextCheckDuration =
+ Duration.between(
+ now,
+
Instant.parse(lastObservedStateBeforeTermination.getLastTransitionTime())
+ .plusMillis(tolerations.getTtlAfterStopMillis()));
+ if (nextCheckDuration.isNegative()) {
+ log.info("Garbage collecting application exceeded given ttl.");
+ ReconcilerUtils.deleteResourceIfExists(client, application, true);
+ return clearCacheAndFinishReconcileForApplication(application,
statusRecorder);
+ } else {
+ log.info(
+ "Application has yet expired, reconciliation would be resumed in
{} millis.",
+ nextCheckDuration.toMillis());
+ return
Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration));
+ }
+ }
}
if (isOnDemandCleanup()) {
return Optional.empty();
@@ -199,21 +231,24 @@ public class AppCleanUpStep extends AppReconcileStep {
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
currentState.getCurrentStateSummary())) {
if (tolerations.isRetainDurationEnabled()) {
- Instant now = Instant.now();
if (tolerations.exceedRetainDurationAtInstant(currentState, now)) {
+ log.info("Garbage collecting secondary resources for application");
onDemandCleanUpReason =
SparkAppStatusUtils::appExceededRetainDuration;
return Optional.empty();
} else {
Duration nextCheckDuration =
Duration.between(
- Instant.now(),
+ now,
Instant.parse(currentState.getLastTransitionTime())
-
.plusMillis(tolerations.getResourceRetainDurationMillis()));
+
.plusMillis(tolerations.computeEffectiveRetainDurationMillis()));
+ log.info(
+ "Application is within retention, reconciliation would be
resumed in {} millis.",
+ nextCheckDuration.toMillis());
return
Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration));
}
} else {
- statusRecorder.removeCachedStatus(application);
- return Optional.of(ReconcileProgress.completeAndNoRequeue());
+ log.info("Retention duration check is not enabled for application.");
+ return clearCacheAndFinishReconcileForApplication(application,
statusRecorder);
}
}
return Optional.empty();
@@ -223,17 +258,25 @@ public class AppCleanUpStep extends AppReconcileStep {
return onDemandCleanUpReason != null;
}
- protected boolean isReleasingResourcesForSchedulingFailureAttempt(
- final ApplicationStatus status) {
+ /**
+ * @param status status of the application
+ * @return The last observed state before termination if the app has
terminated. If the app has
+ * not terminated, return the last observed state
+ */
+ protected ApplicationState getLastObservedStateBeforeTermination(final
ApplicationStatus status) {
ApplicationState lastObservedState = status.getCurrentState();
- if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
- lastObservedState.getCurrentStateSummary())) {
- // if the app has already terminated, use the last observed state before
termination
+ if (lastObservedState.getCurrentStateSummary().isTerminated()) {
NavigableMap<Long, ApplicationState> navMap =
(NavigableMap<Long, ApplicationState>)
status.getStateTransitionHistory();
Map.Entry<Long, ApplicationState> terminateState = navMap.lastEntry();
- lastObservedState =
navMap.lowerEntry(terminateState.getKey()).getValue();
+ return navMap.lowerEntry(terminateState.getKey()).getValue();
}
+ return lastObservedState;
+ }
+
+ protected boolean isReleasingResourcesForSchedulingFailureAttempt(
+ final ApplicationStatus status) {
+ ApplicationState lastObservedState =
getLastObservedStateBeforeTermination(status);
return ApplicationStateSummary.SchedulingFailure.equals(
lastObservedState.getCurrentStateSummary());
}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
index 23ed54b..1fbf563 100644
---
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
@@ -81,6 +81,14 @@ class AppCleanUpStepTest {
.resourceRetainDurationMillis(1L)
.build())
.build();
+ private final ApplicationSpec exceedRetainDurationFromTtl =
+ ApplicationSpec.builder()
+ .applicationTolerations(
+ ApplicationTolerations.builder()
+ .resourceRetainPolicy(ResourceRetainPolicy.Always)
+ .ttlAfterStopMillis(1L)
+ .build())
+ .build();
private final ApplicationSpec notExceedRetainDuration =
ApplicationSpec.builder()
.applicationTolerations(
@@ -89,9 +97,23 @@ class AppCleanUpStepTest {
.resourceRetainDurationMillis(24 * 60 * 60 * 1000L)
.build())
.build();
+ private final ApplicationSpec notExceedTtl =
+ ApplicationSpec.builder()
+ .applicationTolerations(
+ ApplicationTolerations.builder()
+ .resourceRetainPolicy(ResourceRetainPolicy.Always)
+ .ttlAfterStopMillis(24 * 60 * 60 * 1000L)
+ .build())
+ .build();
private final List<ApplicationSpec> specs =
- List.of(alwaysRetain, neverRetain, exceedRetainDuration,
notExceedRetainDuration);
+ List.of(
+ alwaysRetain,
+ neverRetain,
+ exceedRetainDuration,
+ exceedRetainDurationFromTtl,
+ notExceedRetainDuration,
+ notExceedTtl);
@Test
void enableForceDelete() {
@@ -191,6 +213,7 @@ class AppCleanUpStepTest {
verify(mockAppContext, times(1)).getResource();
verify(mockApp, times(2)).getSpec();
verify(mockApp, times(2)).getStatus();
+ verify(mockAppContext).getClient();
verify(mockRecorder).removeCachedStatus(mockApp);
verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp);
}
@@ -214,6 +237,7 @@ class AppCleanUpStepTest {
verify(mockApp, times(2)).getSpec();
verify(mockApp, times(2)).getStatus();
verify(mockRecorder).removeCachedStatus(mockApp);
+ verify(mockAppContext).getClient();
verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp);
}
@@ -246,7 +270,7 @@ class AppCleanUpStepTest {
verify(mockAppContext, times(1)).getResource();
verify(mockApp, times(3)).getSpec();
verify(mockApp, times(3)).getStatus();
- verify(mockAppContext).getClient();
+ verify(mockAppContext, times(2)).getClient();
verify(mockAppContext).getDriverPod();
ArgumentCaptor<ApplicationState> captor =
ArgumentCaptor.forClass(ApplicationState.class);
verify(mockRecorder).appendNewStateAndPersist(eq(mockAppContext),
captor.capture());
@@ -337,7 +361,7 @@ class AppCleanUpStepTest {
}
@Test
- void checkEarlyExitForResourceReleasedApp() {
+ void checkEarlyExitForResourceReleasedAppWithoutTTL() {
AppCleanUpStep routineCheck = new AppCleanUpStep();
AppCleanUpStep cleanUpWithReason = new
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
ApplicationStatus succeeded =
@@ -350,27 +374,112 @@ class AppCleanUpStepTest {
prepareApplicationStatus(
ApplicationStateSummary.ResourceReleased,
ApplicationStateSummary.RunningHealthy);
List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ List<ApplicationSpec> specList =
+ List.of(alwaysRetain, neverRetain, exceedRetainDuration,
notExceedRetainDuration);
- for (ApplicationSpec appSpec : specs) {
+ for (ApplicationSpec appSpec : specList) {
for (ApplicationStatus appStatus : statusList) {
SparkAppStatusRecorder mockRecorder1 =
mock(SparkAppStatusRecorder.class);
SparkAppStatusRecorder mockRecorder2 =
mock(SparkAppStatusRecorder.class);
SparkApplication mockApp = mock(SparkApplication.class);
when(mockApp.getStatus()).thenReturn(appStatus);
when(mockApp.getSpec()).thenReturn(appSpec);
-
Optional<ReconcileProgress> routineCheckProgress =
- routineCheck.checkEarlyExitForTerminatedApp(mockApp,
mockRecorder1);
+ routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp,
mockRecorder1);
assertTrue(routineCheckProgress.isPresent());
+ ReconcileProgress reconcileProgress = routineCheckProgress.get();
+ Optional<ReconcileProgress> onDemandProgress =
+ cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient,
mockApp, mockRecorder2);
+ Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(),
reconcileProgress);
+ verify(mockRecorder1).removeCachedStatus(mockApp);
+ assertTrue(onDemandProgress.isPresent());
Assertions.assertEquals(
ReconcileProgress.completeAndNoRequeue(),
routineCheckProgress.get());
+ verify(mockRecorder2).removeCachedStatus(mockApp);
+ }
+ }
+ }
+
+ @Test
+ void checkEarlyExitForResourceReleasedAppWithExceededTTL() {
+ AppCleanUpStep routineCheck = new AppCleanUpStep();
+ AppCleanUpStep cleanUpWithReason = new
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+ ApplicationStatus succeeded =
+ prepareApplicationStatus(
+ ApplicationStateSummary.ResourceReleased,
ApplicationStateSummary.Succeeded);
+ ApplicationStatus failed =
+ prepareApplicationStatus(
+ ApplicationStateSummary.ResourceReleased,
ApplicationStateSummary.SchedulingFailure);
+ ApplicationStatus cancelled =
+ prepareApplicationStatus(
+ ApplicationStateSummary.ResourceReleased,
ApplicationStateSummary.Failed);
+ List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ for (ApplicationStatus appStatus : statusList) {
+ SparkAppStatusRecorder mockRecorder1 =
mock(SparkAppStatusRecorder.class);
+ SparkAppStatusRecorder mockRecorder2 =
mock(SparkAppStatusRecorder.class);
+ SparkApplication mockApp = mock(SparkApplication.class);
+ when(mockApp.getStatus()).thenReturn(appStatus);
+ when(mockApp.getSpec()).thenReturn(exceedRetainDurationFromTtl);
+ try (MockedStatic<ReconcilerUtils> utils =
Mockito.mockStatic(ReconcilerUtils.class)) {
+ Optional<ReconcileProgress> routineCheckProgress =
+ routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp,
mockRecorder1);
+ assertTrue(routineCheckProgress.isPresent());
+ ReconcileProgress reconcileProgress = routineCheckProgress.get();
+ assertTrue(reconcileProgress.isCompleted());
+ assertFalse(reconcileProgress.isRequeue());
+ utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient,
mockApp, true));
+ verify(mockRecorder1).removeCachedStatus(mockApp);
+ Optional<ReconcileProgress> onDemandProgress =
+ cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient,
mockApp, mockRecorder2);
verify(mockRecorder1).removeCachedStatus(mockApp);
+ assertTrue(onDemandProgress.isPresent());
+ ReconcileProgress reconcileProgressOnDemand = onDemandProgress.get();
+ Assertions.assertEquals(
+ ReconcileProgress.completeAndNoRequeue(),
reconcileProgressOnDemand);
+ verify(mockRecorder2).removeCachedStatus(mockApp);
+ }
+ }
+ }
+ @Test
+ void checkEarlyExitForResourceReleasedAppWithinTTL() {
+ AppCleanUpStep routineCheck = new AppCleanUpStep();
+ AppCleanUpStep cleanUpWithReason = new
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+ ApplicationStatus succeeded =
+ prepareApplicationStatus(
+ ApplicationStateSummary.ResourceReleased,
ApplicationStateSummary.Succeeded);
+ ApplicationStatus failed =
+ prepareApplicationStatus(
+ ApplicationStateSummary.ResourceReleased,
ApplicationStateSummary.SchedulingFailure);
+ ApplicationStatus cancelled =
+ prepareApplicationStatus(
+ ApplicationStateSummary.ResourceReleased,
ApplicationStateSummary.Failed);
+ List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ for (ApplicationStatus appStatus : statusList) {
+ SparkAppStatusRecorder mockRecorder1 =
mock(SparkAppStatusRecorder.class);
+ SparkAppStatusRecorder mockRecorder2 =
mock(SparkAppStatusRecorder.class);
+ SparkApplication mockApp = mock(SparkApplication.class);
+ when(mockApp.getStatus()).thenReturn(appStatus);
+ when(mockApp.getSpec()).thenReturn(notExceedTtl);
+ try (MockedStatic<ReconcilerUtils> utils =
Mockito.mockStatic(ReconcilerUtils.class)) {
+ Optional<ReconcileProgress> routineCheckProgress =
+ routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp,
mockRecorder1);
+ assertTrue(routineCheckProgress.isPresent());
+ ReconcileProgress reconcileProgress = routineCheckProgress.get();
+ assertTrue(reconcileProgress.isCompleted());
+ assertTrue(reconcileProgress.isRequeue());
+ assertTrue(reconcileProgress.getRequeueAfterDuration().toMillis() > 0);
+ utils.verifyNoInteractions();
+ verifyNoMoreInteractions(mockRecorder1);
Optional<ReconcileProgress> onDemandProgress =
- cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp,
mockRecorder2);
+ cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient,
mockApp, mockRecorder2);
assertTrue(onDemandProgress.isPresent());
+ ReconcileProgress reconcileProgressOnDemand = onDemandProgress.get();
Assertions.assertEquals(
- ReconcileProgress.completeAndNoRequeue(),
routineCheckProgress.get());
+ ReconcileProgress.completeAndNoRequeue(),
reconcileProgressOnDemand);
verify(mockRecorder2).removeCachedStatus(mockApp);
}
}
@@ -393,6 +502,7 @@ class AppCleanUpStepTest {
ApplicationStateSummary.TerminatedWithoutReleaseResources,
ApplicationStateSummary.RunningHealthy);
List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
for (ApplicationStatus appStatus : statusList) {
SparkAppStatusRecorder mockRecorder1 =
mock(SparkAppStatusRecorder.class);
@@ -402,13 +512,13 @@ class AppCleanUpStepTest {
when(mockApp.getSpec()).thenReturn(alwaysRetain);
Optional<ReconcileProgress> routineCheckProgress =
- routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1);
+ routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp,
mockRecorder1);
assertTrue(routineCheckProgress.isPresent());
Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(),
routineCheckProgress.get());
verify(mockRecorder1).removeCachedStatus(mockApp);
Optional<ReconcileProgress> onDemandProgress =
- cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp,
mockRecorder2);
+ cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient,
mockApp, mockRecorder2);
assertFalse(onDemandProgress.isPresent());
verifyNoMoreInteractions(mockRecorder2);
}
@@ -431,23 +541,27 @@ class AppCleanUpStepTest {
ApplicationStateSummary.TerminatedWithoutReleaseResources,
ApplicationStateSummary.RunningHealthy);
List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ List<ApplicationSpec> specs = List.of(exceedRetainDuration,
exceedRetainDurationFromTtl);
- for (ApplicationStatus appStatus : statusList) {
- SparkAppStatusRecorder mockRecorder1 =
mock(SparkAppStatusRecorder.class);
- SparkAppStatusRecorder mockRecorder2 =
mock(SparkAppStatusRecorder.class);
- SparkApplication mockApp = mock(SparkApplication.class);
- when(mockApp.getStatus()).thenReturn(appStatus);
- when(mockApp.getSpec()).thenReturn(exceedRetainDuration);
+ for (ApplicationSpec spec : specs) {
+ for (ApplicationStatus appStatus : statusList) {
+ SparkAppStatusRecorder mockRecorder1 =
mock(SparkAppStatusRecorder.class);
+ SparkAppStatusRecorder mockRecorder2 =
mock(SparkAppStatusRecorder.class);
+ SparkApplication mockApp = mock(SparkApplication.class);
+ when(mockApp.getStatus()).thenReturn(appStatus);
+ when(mockApp.getSpec()).thenReturn(spec);
- Optional<ReconcileProgress> routineCheckProgress =
- routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1);
- assertFalse(routineCheckProgress.isPresent());
- verifyNoMoreInteractions(mockRecorder1);
+ Optional<ReconcileProgress> routineCheckProgress =
+ routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp,
mockRecorder1);
+ assertFalse(routineCheckProgress.isPresent());
+ verifyNoMoreInteractions(mockRecorder1, mockClient);
- Optional<ReconcileProgress> onDemandProgress =
- cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp,
mockRecorder2);
- assertFalse(onDemandProgress.isPresent());
- verifyNoMoreInteractions(mockRecorder2);
+ Optional<ReconcileProgress> onDemandProgress =
+ cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient,
mockApp, mockRecorder2);
+ assertFalse(onDemandProgress.isPresent());
+ verifyNoMoreInteractions(mockRecorder2, mockClient);
+ }
}
}
@@ -468,6 +582,7 @@ class AppCleanUpStepTest {
ApplicationStateSummary.TerminatedWithoutReleaseResources,
ApplicationStateSummary.RunningHealthy);
List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
for (ApplicationStatus appStatus : statusList) {
SparkAppStatusRecorder mockRecorder1 =
mock(SparkAppStatusRecorder.class);
@@ -477,17 +592,17 @@ class AppCleanUpStepTest {
when(mockApp.getSpec()).thenReturn(notExceedRetainDuration);
Optional<ReconcileProgress> routineCheckProgress =
- routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1);
+ routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp,
mockRecorder1);
assertTrue(routineCheckProgress.isPresent());
ReconcileProgress reconcileProgress = routineCheckProgress.get();
assertTrue(reconcileProgress.isCompleted());
assertTrue(reconcileProgress.isRequeue());
- verifyNoMoreInteractions(mockRecorder2);
+ verifyNoMoreInteractions(mockRecorder2, mockClient);
Optional<ReconcileProgress> onDemandProgress =
- cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp,
mockRecorder2);
+ cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient,
mockApp, mockRecorder2);
assertFalse(onDemandProgress.isPresent());
- verifyNoMoreInteractions(mockRecorder2);
+ verifyNoMoreInteractions(mockRecorder2, mockClient);
}
}
@@ -500,6 +615,7 @@ class AppCleanUpStepTest {
continue;
}
ApplicationStatus status = prepareApplicationStatus(stateSummary);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
for (ApplicationSpec appSpec : specs) {
SparkAppStatusRecorder mockRecorder1 =
mock(SparkAppStatusRecorder.class);
SparkAppStatusRecorder mockRecorder2 =
mock(SparkAppStatusRecorder.class);
@@ -508,14 +624,14 @@ class AppCleanUpStepTest {
when(mockApp.getSpec()).thenReturn(appSpec);
Optional<ReconcileProgress> routineCheckProgress =
- routineCheck.checkEarlyExitForTerminatedApp(mockApp,
mockRecorder1);
+ routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp,
mockRecorder1);
assertTrue(routineCheckProgress.isEmpty());
- verifyNoMoreInteractions(mockRecorder1);
+ verifyNoMoreInteractions(mockRecorder1, mockClient);
Optional<ReconcileProgress> onDemandProgress =
- cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp,
mockRecorder2);
+ cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient,
mockApp, mockRecorder2);
assertTrue(onDemandProgress.isEmpty());
- verifyNoMoreInteractions(mockRecorder2);
+ verifyNoMoreInteractions(mockRecorder2, mockClient);
}
}
}
diff --git a/tests/e2e/resource-retain-duration/chainsaw-test.yaml
b/tests/e2e/resource-retain-duration/chainsaw-test.yaml
index 2c8b558..57c5dc6 100644
--- a/tests/e2e/resource-retain-duration/chainsaw-test.yaml
+++ b/tests/e2e/resource-retain-duration/chainsaw-test.yaml
@@ -41,6 +41,14 @@ spec:
value: default
timeout: 120s
file:
"../assertions/spark-application/spark-state-transition-with-retain-check.yaml"
+ - wait:
+ apiVersion: spark.apache.org/v1
+ kind: SparkApplication
+ namespace: default
+ name: ($SPARK_APPLICATION_NAME)
+ for:
+ deletion: {}
+ timeout: 60s
catch:
- describe:
apiVersion: spark.apache.org/v1
@@ -53,4 +61,4 @@ spec:
value: ($SPARK_APPLICATION_NAME)
timeout: 120s
content: |
- kubectl delete sparkapplication $SPARK_APPLICATION_NAME
+ kubectl delete sparkapplication $SPARK_APPLICATION_NAME
--ignore-not-found=true
diff --git
a/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml
b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml
index 022fdd4..952bfff 100644
--- a/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml
+++ b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml
@@ -26,6 +26,7 @@ spec:
applicationTolerations:
resourceRetainPolicy: Always
resourceRetainDurationMillis: 10000
+ ttlAfterStopMillis: 30000
sparkConf:
spark.executor.instances: "1"
spark.kubernetes.container.image: "apache/spark:4.0.0-java21-scala"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]