This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/main by this push:
new a2a74db22 fix(ctrl): monitor Pod ready condition
a2a74db22 is described below
commit a2a74db226fce7f0b67ddc0ef7df441656b0c638
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Thu Jan 4 15:28:30 2024 +0100
fix(ctrl): monitor Pod ready condition
When the user uses a startup probe, the Integration won't turn as running
until the condition is reached
Closes #4977
---
pkg/controller/integration/monitor.go | 161 +++++++----------------
pkg/controller/integration/monitor_cronjob.go | 2 +-
pkg/controller/integration/monitor_deployment.go | 4 +-
pkg/controller/integration/monitor_knative.go | 2 +-
4 files changed, 50 insertions(+), 119 deletions(-)
diff --git a/pkg/controller/integration/monitor.go
b/pkg/controller/integration/monitor.go
index 7c5a6b7c6..9a6208fcb 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -254,7 +254,7 @@ func getIntegrationSecretsAndConfigmaps(ctx
context.Context, client client.Clien
type controller interface {
checkReadyCondition(ctx context.Context) (bool, error)
getPodSpec() corev1.PodSpec
- updateReadyCondition(readyPods []corev1.Pod) bool
+ updateReadyCondition(readyPods int) bool
}
func (action *monitorAction) newController(env *trait.Environment, integration
*v1.Integration) (controller, error) {
@@ -318,33 +318,32 @@ func (action *monitorAction)
updateIntegrationPhaseAndReadyCondition(
if err != nil {
return err
}
-
- readyPods, unreadyPods := filterPodsByReadyStatus(environment,
runningPods, controller.getPodSpec())
-
if done, err := controller.checkReadyCondition(ctx); done || err != nil
{
- if len(readyPods) > 0 || len(unreadyPods) > 0 {
- // There may be pods that are not ready but still
probable for getting error messages.
- // Ignore returned error from probing as it's expected
when the ctrl obj is not ready.
- _ = action.probeReadiness(ctx, environment,
integration, unreadyPods, readyPods)
- }
+ // There may be pods that are not ready but still probable for
getting error messages.
+ // Ignore returned error from probing as it's expected when the
ctrl obj is not ready.
+ _, _, _ = action.probeReadiness(ctx, environment, integration,
runningPods)
return err
}
- if done := checkPodStatuses(integration, pendingPods, runningPods);
done {
+ if arePodsFailingStatuses(integration, pendingPods, runningPods) {
return nil
}
- integration.Status.Phase = v1.IntegrationPhaseRunning
-
- if done := controller.updateReadyCondition(readyPods); done {
+ readyPods, probeOk, err := action.probeReadiness(ctx, environment,
integration, runningPods)
+ if err != nil {
+ return err
+ }
+ if !probeOk {
+ integration.Status.Phase = v1.IntegrationPhaseError
return nil
}
- if err := action.probeReadiness(ctx, environment, integration,
unreadyPods, readyPods); err != nil {
- return err
+ if done := controller.updateReadyCondition(readyPods); done {
+ integration.Status.Phase = v1.IntegrationPhaseRunning
+ return nil
}
return nil
}
-func checkPodStatuses(integration *v1.Integration, pendingPods []corev1.Pod,
runningPods []corev1.Pod) bool {
+func arePodsFailingStatuses(integration *v1.Integration, pendingPods
[]corev1.Pod, runningPods []corev1.Pod) bool {
// Check Pods statuses
for _, pod := range pendingPods {
// Check the scheduled condition
@@ -396,114 +395,44 @@ func checkPodStatuses(integration *v1.Integration,
pendingPods []corev1.Pod, run
return false
}
-func filterPodsByReadyStatus(environment *trait.Environment, runningPods
[]corev1.Pod, podSpec corev1.PodSpec) ([]corev1.Pod, []corev1.Pod) {
- var readyPods []corev1.Pod
- var unreadyPods []corev1.Pod
-
- integrationContainerName := environment.GetIntegrationContainerName()
- for _, pod := range runningPods {
- // We compare the Integration PodSpec to that of the Pod in
order to make
- // sure we account for up-to-date version.
- if !comparePodSpec(integrationContainerName, podSpec, pod.Spec)
{
- continue
- }
- ready := kubernetes.GetPodCondition(pod, corev1.PodReady)
- if ready == nil {
- continue
- }
- switch ready.Status {
- case corev1.ConditionTrue:
- // We still account terminating Pods to handle rolling
deployments
- readyPods = append(readyPods, pod)
- case corev1.ConditionFalse:
- if pod.DeletionTimestamp != nil {
- continue
- }
- unreadyPods = append(unreadyPods, pod)
- }
- }
-
- return readyPods, unreadyPods
-}
-
-// comparePodSpec compares given pod spec according to integration specific
information (e.g. digest, container image).
-func comparePodSpec(integrationContainerName string, runningPodSpec
corev1.PodSpec, referencePod corev1.PodSpec) bool {
- runningPodContainer :=
findIntegrationContainer(integrationContainerName, runningPodSpec)
- referencePodContainer :=
findIntegrationContainer(integrationContainerName, referencePod)
-
- if runningPodContainer == nil || referencePodContainer == nil {
- return false
- }
-
- // integration digest must be the same
- if getIntegrationDigest(runningPodContainer.Env) !=
getIntegrationDigest(referencePodContainer.Env) {
- return false
- }
-
- // integration container image must be the same (same integration kit)
- if runningPodContainer.Image != referencePodContainer.Image {
- return false
- }
-
- return true
-}
-
-func getIntegrationDigest(envs []corev1.EnvVar) string {
- for _, env := range envs {
- if env.Name == digest.IntegrationDigestEnvVar {
- return env.Value
- }
- }
-
- return ""
-}
-
-// findIntegrationContainer find if present the integration container in the
pod spec using the integration specifications.
-func findIntegrationContainer(integrationContainerName string, spec
corev1.PodSpec) *corev1.Container {
- for _, c := range spec.Containers {
- if c.Name == integrationContainerName {
- return &c
- }
- }
-
- return nil
-}
-
// probeReadiness calls the readiness probes of the non-ready Pods directly to
retrieve insights from the Camel runtime.
-func (action *monitorAction) probeReadiness(
- ctx context.Context, environment *trait.Environment, integration
*v1.Integration,
- unreadyPods []corev1.Pod, readyPods []corev1.Pod,
-) error {
+// The func return the number of readyPods, the success of the probe and any
error may have happened during its execution.
+func (action *monitorAction) probeReadiness(ctx context.Context, environment
*trait.Environment, integration *v1.Integration, pods []corev1.Pod) (int, bool,
error) {
+ // as a default we assume the Integration is Ready
readyCondition := v1.IntegrationCondition{
Type: v1.IntegrationConditionReady,
- Status: corev1.ConditionFalse,
- Pods: make([]v1.PodCondition, len(unreadyPods)),
+ Status: corev1.ConditionTrue,
+ Pods: make([]v1.PodCondition, len(pods)),
}
+ readyPods := 0
+ unreadyPods := 0
+
runtimeReady := true
runtimeFailed := false
+ probeReadinessOk := true
- for i := range unreadyPods {
- pod := &unreadyPods[i]
- if ready := kubernetes.GetPodCondition(*pod, corev1.PodReady);
ready.Reason != "ContainersNotReady" {
- continue
- }
- container := getIntegrationContainer(environment, pod)
- if container == nil {
- return fmt.Errorf("integration container not found in
Pod %s/%s", pod.Namespace, pod.Name)
- }
-
+ for i := range pods {
+ pod := &pods[i]
readyCondition.Pods[i].Name = pod.Name
-
for p := range pod.Status.Conditions {
if pod.Status.Conditions[p].Type == corev1.PodReady {
readyCondition.Pods[i].Condition =
pod.Status.Conditions[p]
break
}
}
+ // If it's in ready status, then we don't care to probe.
+ if ready := kubernetes.GetPodCondition(*pod, corev1.PodReady);
ready.Status == corev1.ConditionTrue {
+ readyPods++
+ continue
+ }
+ unreadyPods++
+ container := getIntegrationContainer(environment, pod)
+ if container == nil {
+ return readyPods, false, fmt.Errorf("integration
container not found in Pod %s/%s", pod.Namespace, pod.Name)
+ }
if probe := container.ReadinessProbe; probe != nil &&
probe.HTTPGet != nil {
body, err := proxyGetHTTPProbe(ctx, action.client,
probe, pod, container)
-
// When invoking the HTTP probe, the kubernetes client
exposes a very
// specific behavior:
//
@@ -559,7 +488,7 @@ func (action *monitorAction) probeReadiness(
health, err := NewHealthCheck(body)
if err != nil {
- return err
+ return readyPods, false, err
}
for _, check := range health.Checks {
if check.Status == v1.HealthCheckStatusUp {
@@ -575,19 +504,21 @@ func (action *monitorAction) probeReadiness(
}
if runtimeFailed {
- integration.Status.Phase = v1.IntegrationPhaseError
+ probeReadinessOk = false
readyCondition.Reason = v1.IntegrationConditionErrorReason
- readyCondition.Message = fmt.Sprintf("%d/%d pods are not
ready", len(unreadyPods), len(unreadyPods)+len(readyPods))
+ readyCondition.Status = corev1.ConditionFalse
+ readyCondition.Message = fmt.Sprintf("%d/%d pods are not
ready", unreadyPods, unreadyPods+readyPods)
+ integration.Status.SetConditions(readyCondition)
}
if !runtimeReady {
- integration.Status.Phase = v1.IntegrationPhaseError
+ probeReadinessOk = false
readyCondition.Reason =
v1.IntegrationConditionRuntimeNotReadyReason
- readyCondition.Message = fmt.Sprintf("%d/%d pods are not
ready", len(unreadyPods), len(unreadyPods)+len(readyPods))
+ readyCondition.Status = corev1.ConditionFalse
+ readyCondition.Message = fmt.Sprintf("%d/%d pods are not
ready", unreadyPods, unreadyPods+readyPods)
+ integration.Status.SetConditions(readyCondition)
}
- integration.Status.SetConditions(readyCondition)
-
- return nil
+ return readyPods, probeReadinessOk, nil
}
func findHighestPriorityReadyKit(kits []v1.IntegrationKit)
(*v1.IntegrationKit, error) {
diff --git a/pkg/controller/integration/monitor_cronjob.go
b/pkg/controller/integration/monitor_cronjob.go
index a2f969d34..1620a66c3 100644
--- a/pkg/controller/integration/monitor_cronjob.go
+++ b/pkg/controller/integration/monitor_cronjob.go
@@ -77,7 +77,7 @@ func (c *cronJobController) getPodSpec() corev1.PodSpec {
return c.obj.Spec.JobTemplate.Spec.Template.Spec
}
-func (c *cronJobController) updateReadyCondition(readyPods []corev1.Pod) bool {
+func (c *cronJobController) updateReadyCondition(readyPods int) bool {
switch {
case c.obj.Status.LastScheduleTime == nil:
c.integration.SetReadyCondition(corev1.ConditionTrue,
diff --git a/pkg/controller/integration/monitor_deployment.go
b/pkg/controller/integration/monitor_deployment.go
index 55b7797c7..e2f823c16 100644
--- a/pkg/controller/integration/monitor_deployment.go
+++ b/pkg/controller/integration/monitor_deployment.go
@@ -59,14 +59,14 @@ func (c *deploymentController) getPodSpec() corev1.PodSpec {
return c.obj.Spec.Template.Spec
}
-func (c *deploymentController) updateReadyCondition(readyPods []corev1.Pod)
bool {
+func (c *deploymentController) updateReadyCondition(readyPods int) bool {
replicas := int32(1)
if r := c.integration.Spec.Replicas; r != nil {
replicas = *r
}
// The Deployment status reports updated and ready replicas separately,
// so that the number of ready replicas also accounts for older
versions.
- readyReplicas := int32(len(readyPods))
+ readyReplicas := int32(readyPods)
switch {
case readyReplicas >= replicas:
// The Integration is considered ready when the number of
replicas
diff --git a/pkg/controller/integration/monitor_knative.go
b/pkg/controller/integration/monitor_knative.go
index 1d62eef2e..06b7dc82b 100644
--- a/pkg/controller/integration/monitor_knative.go
+++ b/pkg/controller/integration/monitor_knative.go
@@ -51,7 +51,7 @@ func (c *knativeServiceController) getPodSpec()
corev1.PodSpec {
return c.obj.Spec.Template.Spec.PodSpec
}
-func (c *knativeServiceController) updateReadyCondition(readyPods
[]corev1.Pod) bool {
+func (c *knativeServiceController) updateReadyCondition(readyPods int) bool {
ready := kubernetes.GetKnativeServiceCondition(*c.obj,
servingv1.ServiceConditionReady)
if ready.IsTrue() {
c.integration.SetReadyCondition(corev1.ConditionTrue,