This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 013ab62 [SPARK-55623] Add granular restart control with consecutive
failure tracking
013ab62 is described below
commit 013ab620c75d3f03554dec1485b6a7320e898386
Author: Zhou JIANG <[email protected]>
AuthorDate: Fri Mar 6 08:46:06 2026 -0800
[SPARK-55623] Add granular restart control with consecutive failure tracking
### What changes were proposed in this pull request?
This PR implements granular restart control for Spark applications with
support for consecutive failure / scheduling failures tracking.
### Why are the changes needed?
Consecutive failure tracking enables operators to allow more total restarts
for apps with occasional transient failuresm, and to stop quickly on persistent
failures. It also apply special handling for scheduling failures to mitigate
API server stress.
This is an enhancement based on our current configuration for maximal
restarts and backoff interval.
### Does this PR introduce any user-facing change?
Yes. New optional configuration fields availale in RestartConfig:
```yaml
restartConfig:
restartPolicy: Always
maxRestartAttempts: 5 # existing field
restartBackoffMillis: 30000 # existing field
restartCounterResetMillis: 3600000 # existing field
# New: consecutive failure limits
maxRestartOnFailure: 3
restartBackoffMillisForFailure: 60000
maxRestartOnSchedulingFailure: 2
restartBackoffMillisForSchedulingFailure: 300000
```
This should be backwards compartible. All new fields are optional. When
failure-specific limits are not set, the operator uses maxRestartAttempts as
before.
### How was this patch tested?
Unit tests added to validate the limit evaluation flow, with & without the
new fields.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #514 from jiangzho/retry.
Authored-by: Zhou JIANG <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sparkapplications.spark.apache.org-v1.yaml | 16 +
docs/spark_custom_resources.md | 107 ++++++-
.../org/apache/spark/k8s/operator/Constants.java | 15 +
.../spark/k8s/operator/spec/RestartConfig.java | 21 ++
...mptSummary.java => ApplicationAttemptInfo.java} | 43 +--
.../operator/status/ApplicationAttemptSummary.java | 18 +-
.../k8s/operator/status/ApplicationStatus.java | 69 ++++-
.../{AttemptInfo.java => BaseAttemptInfo.java} | 20 +-
.../k8s/operator/status/BaseAttemptSummary.java | 4 +-
.../k8s/operator/status/ClusterAttemptSummary.java | 8 +-
.../spark/k8s/operator/spec/RestartConfigTest.java | 71 +++++
.../k8s/operator/status/ApplicationStatusTest.java | 336 +++++++++++++++++++++
.../reconciler/reconcilesteps/AppCleanUpStep.java | 7 +-
.../reconciler/reconcilesteps/AppInitStep.java | 6 +-
.../k8s/operator/SparkAppSubmissionWorkerTest.java | 4 +-
15 files changed, 683 insertions(+), 62 deletions(-)
diff --git
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
index 7ba701f..6d2fca7 100644
---
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
+++
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
@@ -17424,8 +17424,16 @@ spec:
properties:
maxRestartAttempts:
type: integer
+ maxRestartOnFailure:
+ type: integer
+ maxRestartOnSchedulingFailure:
+ type: integer
restartBackoffMillis:
type: integer
+ restartBackoffMillisForFailure:
+ type: integer
+ restartBackoffMillisForSchedulingFailure:
+ type: integer
restartCounterResetMillis:
default: -1
type: integer
@@ -23958,10 +23966,14 @@ spec:
properties:
attemptInfo:
properties:
+ failureRestartCounter:
+ type: integer
id:
type: integer
restartCounter:
type: integer
+ schedulingFailureRestartCounter:
+ type: integer
type: object
stateTransitionHistory:
additionalProperties:
@@ -25220,10 +25232,14 @@ spec:
properties:
attemptInfo:
properties:
+ failureRestartCounter:
+ type: integer
id:
type: integer
restartCounter:
type: integer
+ schedulingFailureRestartCounter:
+ type: integer
type: object
stateTransitionHistory:
additionalProperties:
diff --git a/docs/spark_custom_resources.md b/docs/spark_custom_resources.md
index dbf5c0e..b26f713 100644
--- a/docs/spark_custom_resources.md
+++ b/docs/spark_custom_resources.md
@@ -234,14 +234,111 @@ restartConfig:
restartBackoffMillis: 30000
```
+### Granular Restart Control
+
+For more fine-grained control over restart behavior, you can configure
different retry limits
+and backoff times for specific failure types. This allows you to handle
different failure
+scenarios with appropriate strategies.
+
+The operator maintains multiple counters to track different types of restarts:
+- General restart counter: Tracks all restarts
+- Consecutive failure counter: Tracks consecutive failures
+- Consecutive scheduling failure counter: Tracks consecutive scheduling
failures only
+
+#### Restart Behavior Control
+
+- Consecutive failure tracking: The failure-specific counters track
consecutive failures
+ of the app, distinguishing between persistent failures (requiring
intervention) and
+ transient issues (safe for retry).
+ - For Example: With `restartPolicy=Always`, `maxRestartAttempts=5` and
`maxRestartOnFailure=2`:
+ - The app would tolerate at maximum of 3 consecutive failures, with maximal
of 5 restarts
+ - In other words, sequence F -> F -> F would stop.
+ - sequence F -> S -> F -> S -> F would continue with the 5th restart as the
succeeded attempts
+ reset the failure counter
+- Granular control over `SchedulingFailure`: similarly, it's possible to
control the maximal
+ restart and backoff interval for consecutive `SchedulingFailure` attempts,
as it can be highly
+ associated with API server rejections, quota exceeded, resource constraints.
+
+#### Restart Limit Evaluation
+
+When an attempt ends, limits are checked in order:
+ 1. General limit (`maxRestartAttempts`) is checked for every restart
+ 2. For failures, the most specific applicable limit is also checked:
+ - Scheduling failures (SchedulingFailure) →
`maxRestartOnSchedulingFailure` (if set)
+ - Other failures → `maxRestartOnFailure` (if set)
+ 3. The application stops if any applicable limit is exceeded
+
+
+#### Configuration Fields
+
+```yaml
+restartConfig:
+ restartPolicy: Always
+ # Default restart configuration (applies to all restarts)
+ maxRestartAttempts: 5
+ restartBackoffMillis: 30000 # 30 seconds
+
+ # Override for consecutive general failures (application crashes, driver
failures, etc.)
+ # This counter resets to 0 on success
+ maxRestartOnFailure: 3
+ restartBackoffMillisForFailure: 60000 # 1 minute
+
+ # Override for consecutive scheduling failures
+ maxRestartOnSchedulingFailure: 1
+ restartBackoffMillisForSchedulingFailure: 300000 # 5 minutes
+```
+
+#### Example Use Cases
+
+Tolerate transient failures but stop on persistent issues:
+
+```yaml
+restartConfig:
+ restartPolicy: Always
+ maxRestartAttempts: 100 # Allow many total attempts
+ restartBackoffMillis: 30000
+ # But stop after 3 consecutive failures (indicates persistent problem)
+ maxRestartOnFailure: 3
+ restartBackoffMillisForFailure: 60000
+```
+
+Mitigate API server stress during scheduling failures:
+
+```yaml
+restartConfig:
+ restartPolicy: Always
+ maxRestartAttempts: 50
+ restartBackoffMillis: 30000
+ # Stop quickly on scheduling failures to avoid overwhelming API server
+ maxRestartOnSchedulingFailure: 2
+ restartBackoffMillisForSchedulingFailure: 600000 # 10 minutes
+```
+
+
+| Field
| Type | Default Value | Description
|
+|-----------------------------------------------------------------------------------------|---------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| .spec.applicationTolerations.restartConfig.restartPolicy
| string | Never | Restart policy: `Never`, `Always`,
`OnFailure`, or `OnInfrastructureFailure`
|
+| .spec.applicationTolerations.restartConfig.maxRestartAttempts
| integer | 3 | Maximum number of restart attempts for
all scenarios (always checked)
|
+| .spec.applicationTolerations.restartConfig.restartBackoffMillis
| integer | 30000 | Default backoff time in milliseconds
between restart attempts
|
+| .spec.applicationTolerations.restartConfig.maxRestartOnFailure
| integer | null | Maximum consecutive failures before
stopping. Resets to 0 on success. If null, uses maxRestartAttempts
|
+| .spec.applicationTolerations.restartConfig.restartBackoffMillisForFailure
| integer | null | Backoff time for application failures.
If null, uses restartBackoffMillis
|
+| .spec.applicationTolerations.restartConfig.maxRestartOnSchedulingFailure
| integer | null | Maximum consecutive scheduling failures
before stopping. Scheduling failures occur when the API server rejects requests
(e.g., quota exceeded, resource constraints). Resets to 0 on success. If null,
falls back to maxRestartOnFailure |
+|
.spec.applicationTolerations.restartConfig.restartBackoffMillisForSchedulingFailure
| integer | null | Backoff time for scheduling failures. If null,
falls back to restartBackoffMillisForFailure
|
+
### Restart Counter reset
-The restartCounterResetMillis field controls automatic restart counter resets
for long-running
+The `restartCounterResetMillis` field controls automatic restart counter
resets for long-running
application attempts. When set to a non-negative value (in milliseconds), the
operator will reset
-the restart counter if an application attempt runs successfully for at least
the specified duration
-before failing. This feature enables user to allow maximal x attempts if an
app fails really
-fast (which could indicate some underlying issue other than the app itself)
while allowing
-indefinite restarts when the app can survive given threshold.
+all restart counters (including the general counter and both failure counters)
if an application
+attempt runs successfully for at least the specified duration before ending.
+
+Time-based reset takes highest precedence over all limit checks. If an attempt
runs longer than
+`restartCounterResetMillis`, the operator will always restart with reset
counters, regardless
+of how many times the application has previously failed.
+
+This feature enables applications to recover from early instability: you can
limit fast-failing
+restarts (which often indicate configuration or infrastructure issues) while
allowing indefinite
+restarts for applications that demonstrate stable operation for extended
periods.
For example, setting
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
index 8ce7ae7..7109761 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
@@ -157,6 +157,21 @@ public class Constants {
public static final String EXCEED_MAX_RETRY_ATTEMPT_MESSAGE =
"The maximum number of restart attempts (%d) has been exceeded.";
+ /**
+ * Message indicating that the maximum number of restart attempts has been
exceeded on
+ * failures.
+ */
+ public static final String EXCEED_MAX_RETRY_ATTEMPT_ON_FAILURE_MESSAGE =
+ "The maximum number of restart attempts on consecutive failures (%d) has
been exceeded.";
+
+ /**
+ * Message indicating that the maximum number of restart attempts has been
exceeded on
+ * scheduling failures.
+ */
+ public static final String
EXCEED_MAX_RETRY_ATTEMPT_ON_SCHEDULING_FAILURE_MESSAGE =
+ "The maximum number of restart attempts on consecutive scheduling
failures (%d) " +
+ "has been exceeded.";
+
/** Message indicating a failure to request the driver from the scheduler
backend. */
public static final String SCHEDULE_FAILURE_MESSAGE =
"Failed to request driver from scheduler backend.";
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/RestartConfig.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/RestartConfig.java
index 64b9837..51fb5fb 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/RestartConfig.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/RestartConfig.java
@@ -27,6 +27,8 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
+
/** Restart configuration for a Spark application. */
@Data
@NoArgsConstructor
@@ -42,4 +44,23 @@ public class RestartConfig {
@Default("-1")
@Builder.Default
protected Long restartCounterResetMillis = -1L;
+ @Builder.Default protected Long maxRestartOnFailure = null;
+ @Builder.Default protected Long restartBackoffMillisForFailure = null;
+ @Builder.Default protected Long maxRestartOnSchedulingFailure = null;
+ @Builder.Default protected Long restartBackoffMillisForSchedulingFailure =
null;
+ /**
+ * Returns the effective restart backoff time in milliseconds based on the
current application
+ * state.
+ */
+ public long getEffectiveRestartBackoffMillis(
+ ApplicationStateSummary stateSummary) {
+ if (ApplicationStateSummary.SchedulingFailure == stateSummary
+ && restartBackoffMillisForSchedulingFailure != null) {
+ return restartBackoffMillisForSchedulingFailure;
+ }
+ if (stateSummary.isFailure() && restartBackoffMillisForFailure != null) {
+ return restartBackoffMillisForFailure;
+ }
+ return restartBackoffMillis;
+ }
}
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationAttemptSummary.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationAttemptInfo.java
similarity index 54%
copy from
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationAttemptSummary.java
copy to
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationAttemptInfo.java
index 2d3c6a7..1577e16 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationAttemptSummary.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationAttemptInfo.java
@@ -19,36 +19,45 @@
package org.apache.spark.k8s.operator.status;
-import java.util.SortedMap;
-
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.EqualsAndHashCode;
import lombok.Getter;
+import lombok.Setter;
import lombok.ToString;
-/** Summary of a Spark application attempt. */
+/**
+ * Information about an attempt.
+ *
+ * <p>Maintains counters for different restart limit checks:
+ * <ul>
+ * <li><b>failureRestartCounter</b>: Consecutive failure count, checked
against
+ * maxRestartOnFailure</li>
+ * <li><b>schedulingFailureRestartCounter</b>: Consecutive scheduling
failure count, checked
+ * against maxRestartOnSchedulingFailure</li>
+ * </ul>
+ *
+ */
+@Setter
@Getter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class ApplicationAttemptSummary extends BaseAttemptSummary {
- // The state transition history for given attempt
- // This is used when state history trimming is enabled
- protected final SortedMap<Long, ApplicationState> stateTransitionHistory;
-
- public ApplicationAttemptSummary(
- AttemptInfo attemptInfo, SortedMap<Long, ApplicationState>
stateTransitionHistory) {
- super(attemptInfo);
- this.stateTransitionHistory = stateTransitionHistory;
- }
+public class ApplicationAttemptInfo extends BaseAttemptInfo {
+ protected long failureRestartCounter;
+ protected long schedulingFailureRestartCounter;
- public ApplicationAttemptSummary() {
- this(new AttemptInfo(), null);
+ public ApplicationAttemptInfo() {
+ super();
+ failureRestartCounter = 0L;
+ schedulingFailureRestartCounter = 0L;
}
- public ApplicationAttemptSummary(AttemptInfo attemptInfo) {
- this(attemptInfo, null);
+ public ApplicationAttemptInfo(long id, long restartCounter, long
failureRestartCounter,
+ long schedulingFailureRestartCounter) {
+ super(id, restartCounter);
+ this.failureRestartCounter = failureRestartCounter;
+ this.schedulingFailureRestartCounter = schedulingFailureRestartCounter;
}
}
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationAttemptSummary.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationAttemptSummary.java
index 2d3c6a7..3c680a1 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationAttemptSummary.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationAttemptSummary.java
@@ -28,27 +28,31 @@ import lombok.Getter;
import lombok.ToString;
/** Summary of a Spark application attempt. */
-@Getter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class ApplicationAttemptSummary extends BaseAttemptSummary {
+public class ApplicationAttemptSummary extends
BaseAttemptSummary<ApplicationAttemptInfo> {
// The state transition history for given attempt
// This is used when state history trimming is enabled
- protected final SortedMap<Long, ApplicationState> stateTransitionHistory;
+ @Getter protected final SortedMap<Long, ApplicationState>
stateTransitionHistory;
- public ApplicationAttemptSummary(
- AttemptInfo attemptInfo, SortedMap<Long, ApplicationState>
stateTransitionHistory) {
+ public ApplicationAttemptSummary(ApplicationAttemptInfo attemptInfo,
+ SortedMap<Long, ApplicationState>
stateTransitionHistory) {
super(attemptInfo);
this.stateTransitionHistory = stateTransitionHistory;
}
public ApplicationAttemptSummary() {
- this(new AttemptInfo(), null);
+ this(new ApplicationAttemptInfo(), null);
}
- public ApplicationAttemptSummary(AttemptInfo attemptInfo) {
+ public ApplicationAttemptSummary(ApplicationAttemptInfo attemptInfo) {
this(attemptInfo, null);
}
+
+ @Override
+ public ApplicationAttemptInfo getAttemptInfo() {
+ return attemptInfo == null ? new ApplicationAttemptInfo() : attemptInfo;
+ }
}
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStatus.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStatus.java
index 17901cc..5ee6830 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStatus.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStatus.java
@@ -19,8 +19,6 @@
package org.apache.spark.k8s.operator.status;
-import static
org.apache.spark.k8s.operator.Constants.EXCEED_MAX_RETRY_ATTEMPT_MESSAGE;
-
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -33,6 +31,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.spec.ResourceRetainPolicy;
import org.apache.spark.k8s.operator.spec.RestartConfig;
import org.apache.spark.k8s.operator.spec.RestartPolicy;
@@ -127,12 +126,39 @@ public class ApplicationStatus
>= 0;
}
- long effectiveAttemptId =
- resetRestartCounter ? 0L :
currentAttemptSummary.getAttemptInfo().getRestartCounter();
+ ApplicationStateSummary currentStateSummary =
currentState.getCurrentStateSummary();
+ ApplicationAttemptInfo nextAttemptInfo =
getAttemptInfo(resetRestartCounter,
+ currentAttemptSummary.getAttemptInfo(), currentStateSummary);
+
+ boolean exceededLimit =
+ nextAttemptInfo.getRestartCounter() >
restartConfig.getMaxRestartAttempts();
+ String stateMessage = "";
- if (effectiveAttemptId >= restartConfig.getMaxRestartAttempts()) {
- String stateMessage =
- String.format(EXCEED_MAX_RETRY_ATTEMPT_MESSAGE,
restartConfig.getMaxRestartAttempts());
+ if (exceededLimit) {
+ stateMessage =
+ String.format(
+ Constants.EXCEED_MAX_RETRY_ATTEMPT_MESSAGE,
+ restartConfig.getMaxRestartAttempts());
+ } else if (restartConfig.getMaxRestartOnSchedulingFailure() != null
+ && ApplicationStateSummary.SchedulingFailure == currentStateSummary) {
+ exceededLimit =
+ nextAttemptInfo.getSchedulingFailureRestartCounter()
+ > restartConfig.getMaxRestartOnSchedulingFailure();
+ stateMessage =
+ String.format(
+ Constants.EXCEED_MAX_RETRY_ATTEMPT_ON_SCHEDULING_FAILURE_MESSAGE,
+ restartConfig.getMaxRestartOnSchedulingFailure());
+ } else if (restartConfig.getMaxRestartOnFailure() != null
+ && currentStateSummary.isFailure()) {
+ exceededLimit =
+ nextAttemptInfo.getFailureRestartCounter() >
restartConfig.getMaxRestartOnFailure();
+ stateMessage =
+ String.format(
+ Constants.EXCEED_MAX_RETRY_ATTEMPT_ON_FAILURE_MESSAGE,
+ restartConfig.getMaxRestartOnFailure());
+ }
+
+ if (exceededLimit) {
if (stateMessageOverride != null && !stateMessageOverride.isEmpty()) {
stateMessage += stateMessageOverride;
}
@@ -153,9 +179,6 @@ public class ApplicationStatus
currentAttemptSummary);
}
- AttemptInfo nextAttemptInfo =
-
currentAttemptSummary.getAttemptInfo().createNextAttemptInfo(resetRestartCounter);
-
ApplicationAttemptSummary nextAttemptSummary = new
ApplicationAttemptSummary(nextAttemptInfo);
ApplicationState state =
new ApplicationState(ApplicationStateSummary.ScheduledToRestart,
stateMessageOverride);
@@ -180,6 +203,32 @@ public class ApplicationStatus
}
}
+ private ApplicationAttemptInfo getAttemptInfo(boolean resetRestartCounter,
+ ApplicationAttemptInfo
currentAttemptInfo,
+ ApplicationStateSummary
currentStateSummary) {
+ long newRestartCounter = resetRestartCounter ? 1L :
currentAttemptInfo.getRestartCounter() + 1;
+ long newFailureCounter;
+ long newSchedulingFailureCounter;
+ if (resetRestartCounter) {
+ newFailureCounter = 0L;
+ newSchedulingFailureCounter = 0L;
+ } else if (ApplicationStateSummary.SchedulingFailure ==
currentStateSummary) {
+ newSchedulingFailureCounter =
currentAttemptInfo.getSchedulingFailureRestartCounter() + 1;
+ newFailureCounter = currentAttemptInfo.getFailureRestartCounter() + 1;
+ } else if (currentStateSummary.isFailure()) {
+ newFailureCounter = currentAttemptInfo.getFailureRestartCounter() + 1;
+ newSchedulingFailureCounter = 0L;
+ } else {
+ newFailureCounter = 0L;
+ newSchedulingFailureCounter = 0L;
+ }
+ return new ApplicationAttemptInfo(
+ currentAttemptInfo.getId() + 1L,
+ newRestartCounter,
+ newFailureCounter,
+ newSchedulingFailureCounter);
+ }
+
/**
* Finds the first state of the current application attempt.
*
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/AttemptInfo.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/BaseAttemptInfo.java
similarity index 73%
rename from
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/AttemptInfo.java
rename to
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/BaseAttemptInfo.java
index 7c28ed9..6f415da 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/AttemptInfo.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/BaseAttemptInfo.java
@@ -29,7 +29,10 @@ import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
-/** Information about an attempt. */
+/**
+ * Basic information about an attempt.
+ */
+@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
@@ -37,16 +40,7 @@ import lombok.ToString;
@ToString
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class AttemptInfo {
- @Getter @Builder.Default protected final long id = 0L;
- @Getter @Setter protected long restartCounter;
-
- /**
- * Creates a new AttemptInfo object representing the next attempt.
- *
- * @return A new AttemptInfo with an incremented ID.
- */
- public AttemptInfo createNextAttemptInfo(boolean resetRestartCounter) {
- return new AttemptInfo(id + 1L, resetRestartCounter ? 1L : restartCounter
+ 1);
- }
+public class BaseAttemptInfo {
+ @Builder.Default protected final long id = 0L;
+ @Builder.Default @Setter protected long restartCounter = 0L;
}
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/BaseAttemptSummary.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/BaseAttemptSummary.java
index ad7673e..85e07e8 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/BaseAttemptSummary.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/BaseAttemptSummary.java
@@ -33,6 +33,6 @@ import lombok.ToString;
@RequiredArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class BaseAttemptSummary {
- protected final AttemptInfo attemptInfo;
+public class BaseAttemptSummary<I extends BaseAttemptInfo> {
+ protected final I attemptInfo;
}
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ClusterAttemptSummary.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ClusterAttemptSummary.java
index 4a79c88..d2b46c5 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ClusterAttemptSummary.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ClusterAttemptSummary.java
@@ -33,20 +33,20 @@ import lombok.ToString;
@ToString(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class ClusterAttemptSummary extends BaseAttemptSummary {
+public class ClusterAttemptSummary extends BaseAttemptSummary<BaseAttemptInfo>
{
protected final SortedMap<Long, ClusterState> stateTransitionHistory;
public ClusterAttemptSummary(
- AttemptInfo attemptInfo, SortedMap<Long, ClusterState>
stateTransitionHistory) {
+ BaseAttemptInfo attemptInfo, SortedMap<Long, ClusterState>
stateTransitionHistory) {
super(attemptInfo);
this.stateTransitionHistory = stateTransitionHistory;
}
public ClusterAttemptSummary() {
- this(new AttemptInfo(), null);
+ this(new BaseAttemptInfo(), null);
}
- public ClusterAttemptSummary(AttemptInfo attemptInfo) {
+ public ClusterAttemptSummary(BaseAttemptInfo attemptInfo) {
this(attemptInfo, null);
}
}
diff --git
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/RestartConfigTest.java
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/RestartConfigTest.java
new file mode 100644
index 0000000..5e61fe9
--- /dev/null
+++
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/RestartConfigTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.spec;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
+
+class RestartConfigTest {
+
+ @Test
+ void effectiveRestartBackoffMillisUsesDefaultWhenFailureOverrideIsNull() {
+ RestartConfig config =
+ RestartConfig.builder()
+ .restartBackoffMillis(30000L)
+ .restartBackoffMillisForFailure(null) // No override
+ .build();
+
+ assertEquals(
+ 30000L,
config.getEffectiveRestartBackoffMillis(ApplicationStateSummary.Failed));
+ assertEquals(
+ 30000L,
config.getEffectiveRestartBackoffMillis(ApplicationStateSummary.Succeeded));
+ }
+
+ @Test
+ void effectiveRestartBackoffMillisUsesFailureOverrideWhenSet() {
+ RestartConfig config =
+ RestartConfig.builder()
+ .restartBackoffMillis(10000L) // 10 seconds default
+ .restartBackoffMillisForFailure(60000L) // 1 minute for failures
+ .build();
+
+ assertEquals(
+ 60000L,
config.getEffectiveRestartBackoffMillis(ApplicationStateSummary.Failed));
+ assertEquals(
+ 10000L,
config.getEffectiveRestartBackoffMillis(ApplicationStateSummary.Succeeded));
+ }
+
+ @Test
+ void effectiveRestartBackoffMillisHandlesZeroFailureOverride() {
+ RestartConfig config =
+ RestartConfig.builder()
+ .restartBackoffMillis(30000L)
+ .restartBackoffMillisForFailure(0L) // No delay for failures
+ .build();
+
+ assertEquals(
+ 0L,
config.getEffectiveRestartBackoffMillis(ApplicationStateSummary.Failed));
+ assertEquals(
+ 30000L,
config.getEffectiveRestartBackoffMillis(ApplicationStateSummary.Succeeded));
+ }
+}
diff --git
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/status/ApplicationStatusTest.java
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/status/ApplicationStatusTest.java
index ef5cc8a..d53fcca 100644
---
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/status/ApplicationStatusTest.java
+++
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/status/ApplicationStatusTest.java
@@ -393,4 +393,340 @@ class ApplicationStatusTest {
submittedState.setLastTransitionTime(submittedTime.toString());
return status;
}
+
+ @Test
+ void terminateOrRestartUsesFailureOverrideForFailedState() {
+ RestartConfig config =
+ RestartConfig.builder()
+ .restartPolicy(RestartPolicy.Always)
+ .maxRestartAttempts(10L)
+ .maxRestartOnFailure(0L)
+ .build();
+
+ ApplicationStatus status =
+ new ApplicationStatus()
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.RunningHealthy, ""))
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.Failed, "error"));
+
+ ApplicationStatus restarted =
+ status.terminateOrRestart(config, ResourceRetainPolicy.Never, null,
false);
+
+ assertEquals(
+ ApplicationStateSummary.ResourceReleased,
+ restarted.getCurrentState().getCurrentStateSummary());
+ assertEquals(0L,
restarted.getCurrentAttemptSummary().getAttemptInfo().getId());
+ }
+
+ @Test
+ void terminateOrRestartUsesGeneralMaxAttemptsForNonFailureState() {
+ RestartConfig config =
+ RestartConfig.builder()
+ .restartPolicy(RestartPolicy.Always)
+ .maxRestartAttempts(10L)
+ .maxRestartOnFailure(0L)
+ .build();
+
+ ApplicationStatus status =
+ new ApplicationStatus()
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.RunningHealthy, ""))
+ .appendNewState(new ApplicationState(Succeeded, "completed"));
+
+ ApplicationStatus restarted =
+ status.terminateOrRestart(config, ResourceRetainPolicy.Never, null,
false);
+
+ assertEquals(
+ ApplicationStateSummary.ScheduledToRestart,
+ restarted.getCurrentState().getCurrentStateSummary());
+ assertEquals(1L,
restarted.getCurrentAttemptSummary().getAttemptInfo().getId());
+ }
+
+ @Test
+ void terminateOrRestartSchedulingFailureUsesSchedulingOverride() {
+ RestartConfig config =
+ RestartConfig.builder()
+ .restartPolicy(RestartPolicy.Always)
+ .maxRestartAttempts(10L)
+ .maxRestartOnFailure(10L)
+ .maxRestartOnSchedulingFailure(0L)
+ .build();
+
+ ApplicationStatus status =
+ new ApplicationStatus()
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(
+ new
ApplicationState(ApplicationStateSummary.SchedulingFailure, "quota exceeded"));
+
+ ApplicationStatus restarted =
+ status.terminateOrRestart(config, ResourceRetainPolicy.Never, null,
false);
+
+ assertEquals(
+ ApplicationStateSummary.ResourceReleased,
+ restarted.getCurrentState().getCurrentStateSummary());
+ assertEquals(0L,
restarted.getCurrentAttemptSummary().getAttemptInfo().getId());
+ }
+
+ @Test
+ void terminateOrRestartSchedulingFailureFallsBackToGeneralFailureOverride() {
+ RestartConfig config =
+ RestartConfig.builder()
+ .restartPolicy(RestartPolicy.Always)
+ .maxRestartAttempts(10L)
+ .maxRestartOnFailure(0L)
+ .build();
+
+ ApplicationStatus status =
+ new ApplicationStatus()
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(
+ new ApplicationState(
+ ApplicationStateSummary.SchedulingFailure, "resources
unavailable"));
+
+ ApplicationStatus restarted =
+ status.terminateOrRestart(config, ResourceRetainPolicy.Never, null,
false);
+
+ assertEquals(
+ ApplicationStateSummary.ResourceReleased,
+ restarted.getCurrentState().getCurrentStateSummary());
+ assertEquals(0L,
restarted.getCurrentAttemptSummary().getAttemptInfo().getId());
+ }
+
+ @Test
+ void consecutiveFailureCounterResetsOnSuccess() {
+ RestartConfig config =
+ RestartConfig.builder()
+ .restartPolicy(RestartPolicy.Always)
+ .maxRestartAttempts(10L)
+ .maxRestartOnFailure(2L) // Only 2 consecutive failures allowed
+ .build();
+
+ ApplicationStatus status = new ApplicationStatus();
+ // simulate a F -> S -> F -> F -> F scenario
+ // Attempt 1: Fails
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.Failed, "error1"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ assertEquals(
+ ApplicationStateSummary.ScheduledToRestart,
+ status.getCurrentState().getCurrentStateSummary());
+ assertEquals(1L,
status.getCurrentAttemptSummary().getAttemptInfo().getRestartCounter());
+ assertEquals(
+ 1L,
status.getCurrentAttemptSummary().getAttemptInfo().getFailureRestartCounter());
+
+ // Attempt 2: Succeeds - should reset failure counter
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new ApplicationState(Succeeded, "success1"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ assertEquals(
+ ApplicationStateSummary.ScheduledToRestart,
+ status.getCurrentState().getCurrentStateSummary());
+ assertEquals(2L,
status.getCurrentAttemptSummary().getAttemptInfo().getRestartCounter());
+ // Failure counter should be reset to 0
+ assertEquals(
+ 0L,
status.getCurrentAttemptSummary().getAttemptInfo().getFailureRestartCounter());
+
+ // Attempt 3: Fails again - failure counter restarts from 1
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.Failed, "error2"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ assertEquals(
+ ApplicationStateSummary.ScheduledToRestart,
+ status.getCurrentState().getCurrentStateSummary());
+ assertEquals(3L,
status.getCurrentAttemptSummary().getAttemptInfo().getRestartCounter());
+ // Failure counter should be 1 (not 2, because success reset it)
+ assertEquals(
+ 1L,
status.getCurrentAttemptSummary().getAttemptInfo().getFailureRestartCounter());
+
+ // Attempt 4: Fails again - 2 consecutive failures now
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.Failed, "error3"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ assertEquals(
+ ApplicationStateSummary.ScheduledToRestart,
+ status.getCurrentState().getCurrentStateSummary());
+ assertEquals(4L,
status.getCurrentAttemptSummary().getAttemptInfo().getRestartCounter());
+ assertEquals(
+ 2L,
status.getCurrentAttemptSummary().getAttemptInfo().getFailureRestartCounter());
+
+ // Attempt 5: Fails for third consecutive time - should exceed limit
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.Failed, "error4"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ // Should stop because consecutive failure counter 3 > maxRestartOnFailure
2
+ assertEquals(
+ ApplicationStateSummary.ResourceReleased,
+ status.getCurrentState().getCurrentStateSummary());
+ }
+
+ @Test
+ void schedulingFailureIncrementsBothConsecutiveCounters() {
+ RestartConfig config =
+ RestartConfig.builder()
+ .restartPolicy(RestartPolicy.Always)
+ .maxRestartAttempts(10L)
+ .maxRestartOnFailure(2L) // Only 2 consecutive general failures
+ .maxRestartOnSchedulingFailure(null) // No specific override
+ .build();
+
+ ApplicationStatus status = new ApplicationStatus();
+
+ // Attempt 1: Scheduling failure
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(
+ new
ApplicationState(ApplicationStateSummary.SchedulingFailure, "quota1"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ assertEquals(1L,
status.getCurrentAttemptSummary().getAttemptInfo().getFailureRestartCounter());
+ assertEquals(
+ 1L,
+
status.getCurrentAttemptSummary().getAttemptInfo().getSchedulingFailureRestartCounter());
+
+ // Attempt 2: Another scheduling failure
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(
+ new
ApplicationState(ApplicationStateSummary.SchedulingFailure, "quota2"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ assertEquals(2L,
status.getCurrentAttemptSummary().getAttemptInfo().getFailureRestartCounter());
+ assertEquals(
+ 2L,
+
status.getCurrentAttemptSummary().getAttemptInfo().getSchedulingFailureRestartCounter());
+
+ // Attempt 3: Third consecutive scheduling failure - should exceed general
failure limit
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(
+ new
ApplicationState(ApplicationStateSummary.SchedulingFailure, "quota3"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ // Should stop because consecutive failure counter 3 > maxRestartOnFailure
2
+ assertEquals(
+ ApplicationStateSummary.ResourceReleased,
+ status.getCurrentState().getCurrentStateSummary());
+ }
+
+ @Test
+ void generalLimitEnforcedEvenForFailures() {
+ RestartConfig config =
+ RestartConfig.builder()
+ .restartPolicy(RestartPolicy.Always)
+ .maxRestartAttempts(2L) // General limit
+ .maxRestartOnFailure(10L)
+ .build();
+
+ ApplicationStatus status = new ApplicationStatus();
+
+ // Attempt 1: Succeeds
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new ApplicationState(Succeeded, "success1"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ assertEquals(
+ ApplicationStateSummary.ScheduledToRestart,
+ status.getCurrentState().getCurrentStateSummary());
+ assertEquals(1L,
status.getCurrentAttemptSummary().getAttemptInfo().getRestartCounter());
+
+ // Attempt 2: Succeeds
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new ApplicationState(Succeeded, "success2"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ assertEquals(
+ ApplicationStateSummary.ScheduledToRestart,
+ status.getCurrentState().getCurrentStateSummary());
+ assertEquals(2L,
status.getCurrentAttemptSummary().getAttemptInfo().getRestartCounter());
+
+ // Attempt 3: Fails - should stop due to general limit even though failure
limit is 10
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.Failed, "error"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ // Should stop because general counter >= general limit
+ assertEquals(
+ ApplicationStateSummary.ResourceReleased,
+ status.getCurrentState().getCurrentStateSummary());
+ assertTrue(
+ status
+ .getCurrentState()
+ .getMessage()
+ .contains("The maximum number of restart attempts (2) has been
exceeded."));
+ }
+
+ @Test
+ void mixedFailureTypesResetIndependently() {
+ RestartConfig config =
+ RestartConfig.builder()
+ .restartPolicy(RestartPolicy.Always)
+ .maxRestartAttempts(10L)
+ .maxRestartOnFailure(3L)
+ .maxRestartOnSchedulingFailure(2L)
+ .build();
+
+ ApplicationStatus status = new ApplicationStatus();
+
+ // Attempt 1: Scheduling failure
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(
+ new
ApplicationState(ApplicationStateSummary.SchedulingFailure, "quota"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ assertEquals(1L,
status.getCurrentAttemptSummary().getAttemptInfo().getFailureRestartCounter());
+ assertEquals(
+ 1L,
+
status.getCurrentAttemptSummary().getAttemptInfo().getSchedulingFailureRestartCounter());
+
+ // Attempt 2: Regular failure (not scheduling)
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.Failed, "error"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ assertEquals(2L,
status.getCurrentAttemptSummary().getAttemptInfo().getFailureRestartCounter());
+ // Scheduling failure counter reset
+ assertEquals(
+ 0L,
+
status.getCurrentAttemptSummary().getAttemptInfo().getSchedulingFailureRestartCounter());
+
+ // Attempt 3: Success - resets both counters
+ status =
+ status
+ .appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+ .appendNewState(new ApplicationState(Succeeded, "success"));
+
+ status = status.terminateOrRestart(config, ResourceRetainPolicy.Never,
null, false);
+ assertEquals(0L,
status.getCurrentAttemptSummary().getAttemptInfo().getFailureRestartCounter());
+ assertEquals(
+ 0L,
+
status.getCurrentAttemptSummary().getAttemptInfo().getSchedulingFailureRestartCounter());
+ }
}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
index 4bfa76b..ca88511 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
@@ -174,6 +174,7 @@ public class AppCleanUpStep extends AppReconcileStep {
return appendStateAndRequeueAfter(
context, statusRecorder, state,
Duration.ofMillis(requeueAfterMillis));
} else {
+
updatedStatus =
currentStatus.terminateOrRestart(
tolerations.getRestartConfig(),
@@ -184,7 +185,11 @@ public class AppCleanUpStep extends AppReconcileStep {
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
if (ApplicationStateSummary.ScheduledToRestart ==
updatedStatus.getCurrentState().getCurrentStateSummary()) {
- requeueAfterMillis =
tolerations.getRestartConfig().getRestartBackoffMillis();
+ // Check if current state is a failure before restarting
+ ApplicationStateSummary currentStateSummary =
+ currentStatus.getCurrentState().getCurrentStateSummary();
+ requeueAfterMillis =
+
tolerations.getRestartConfig().getEffectiveRestartBackoffMillis(currentStateSummary);
}
return updateStatusAndRequeueAfter(
context, statusRecorder, updatedStatus,
Duration.ofMillis(requeueAfterMillis));
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
index cc5c36a..2cde405 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
@@ -38,6 +38,7 @@ import org.apache.spark.k8s.operator.SparkApplication;
import org.apache.spark.k8s.operator.context.SparkAppContext;
import org.apache.spark.k8s.operator.decorators.DriverResourceDecorator;
import org.apache.spark.k8s.operator.reconciler.ReconcileProgress;
+import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
import org.apache.spark.k8s.operator.status.ApplicationState;
import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
import org.apache.spark.k8s.operator.status.ApplicationStatus;
@@ -64,12 +65,15 @@ public class AppInitStep extends AppReconcileStep {
SparkApplication app = context.getResource();
if (app.getStatus().getPreviousAttemptSummary() != null) {
Instant lastTransitionTime =
Instant.parse(currentState.getLastTransitionTime());
+ ApplicationAttemptSummary attemptSummary =
app.getStatus().getPreviousAttemptSummary();
+ ApplicationState lastState = attemptSummary.getStateTransitionHistory()
+ .get(attemptSummary.getStateTransitionHistory().lastKey());
Instant restartTime =
lastTransitionTime.plusMillis(
app.getSpec()
.getApplicationTolerations()
.getRestartConfig()
- .getRestartBackoffMillis());
+
.getEffectiveRestartBackoffMillis(lastState.getCurrentStateSummary()));
Instant now = Instant.now();
if (restartTime.isAfter(now)) {
return ReconcileProgress.completeAndRequeueAfter(Duration.between(now,
restartTime));
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
index b1a350c..af71b4c 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
@@ -46,9 +46,9 @@ import org.apache.spark.k8s.operator.spec.ApplicationSpec;
import org.apache.spark.k8s.operator.spec.ApplicationTolerations;
import org.apache.spark.k8s.operator.spec.ResourceRetainPolicy;
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
+import org.apache.spark.k8s.operator.status.ApplicationAttemptInfo;
import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
import org.apache.spark.k8s.operator.status.ApplicationStatus;
-import org.apache.spark.k8s.operator.status.AttemptInfo;
@SuppressWarnings("PMD.UnusedLocalVariable")
class SparkAppSubmissionWorkerTest {
@@ -221,7 +221,7 @@ class SparkAppSubmissionWorkerTest {
"Multiple invoke of generateSparkAppId shall give same result.");
ApplicationAttemptSummary mockAttempt =
mock(ApplicationAttemptSummary.class);
- AttemptInfo mockAttemptInfo = mock(AttemptInfo.class);
+ ApplicationAttemptInfo mockAttemptInfo =
mock(ApplicationAttemptInfo.class);
when(mockAttempt.getAttemptInfo()).thenReturn(mockAttemptInfo);
when(mockAttemptInfo.getId()).thenReturn(2L);
when(mockStatus1.getCurrentAttemptSummary()).thenReturn(mockAttempt);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]