This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new aa3d0e2 [FLINK-26473] Simplify error checking, fix benign NPE
aa3d0e2 is described below
commit aa3d0e2791f00f48114bf13d132dd6c3813ab643
Author: Thomas Weise <[email protected]>
AuthorDate: Wed Mar 16 14:41:59 2022 -0700
[FLINK-26473] Simplify error checking, fix benign NPE
---
.../kubernetes/operator/observer/BaseObserver.java | 33 +++++++++++-----------
1 file changed, 16 insertions(+), 17 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
index a694f6d..43140ed 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
@@ -92,24 +92,12 @@ public abstract class BaseObserver implements Observer {
flinkApp.getMetadata().getNamespace(),
status);
- List<DeploymentCondition> conditions = status.getConditions();
- for (DeploymentCondition dc : conditions) {
- if ("FailedCreate".equals(dc.getReason())
- && "ReplicaFailure".equals(dc.getType())) {
- // throw only when not already in error status to allow
for spec update
- if (!JobManagerDeploymentStatus.ERROR.equals(
- deploymentStatus.getJobManagerDeploymentStatus()))
{
- throw new DeploymentFailedException(
-
DeploymentFailedException.COMPONENT_JOBMANAGER, dc);
- }
- return;
- }
- }
-
- // checking the pod is expensive; only do it when the deployment
isn't ready
try {
+ checkFailedCreate(status);
+ // checking the pod is expensive; only do it when the
deployment isn't ready
checkCrashLoopBackoff(flinkApp, effectiveConfig);
} catch (DeploymentFailedException dfe) {
+ // throw only when not already in error status to allow for
spec update
if (!JobManagerDeploymentStatus.ERROR.equals(
deploymentStatus.getJobManagerDeploymentStatus())) {
throw dfe;
@@ -132,13 +120,24 @@ public abstract class BaseObserver implements Observer {
: null);
}
+ private void checkFailedCreate(DeploymentStatus status) {
+ List<DeploymentCondition> conditions = status.getConditions();
+ for (DeploymentCondition dc : conditions) {
+ if ("FailedCreate".equals(dc.getReason()) &&
"ReplicaFailure".equals(dc.getType())) {
+ throw new DeploymentFailedException(
+ DeploymentFailedException.COMPONENT_JOBMANAGER, dc);
+ }
+ }
+ }
+
private void checkCrashLoopBackoff(FlinkDeployment flinkApp, Configuration
effectiveConfig) {
PodList jmPods = flinkService.getJmPodList(flinkApp, effectiveConfig);
for (Pod pod : jmPods.getItems()) {
for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
ContainerStateWaiting csw = cs.getState().getWaiting();
- if
(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF.equals(csw.getReason())) {
- logger.warn("JobManager pod fails: {} {}",
csw.getReason(), csw.getMessage());
+ if (csw != null
+ &&
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF.equals(
+ csw.getReason())) {
throw new DeploymentFailedException(
DeploymentFailedException.COMPONENT_JOBMANAGER,
"Warning", csw);
}