This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit dcb7f36cff7f3cbc56b2b9ed7dc94df31600a65a Author: Pasquale Congiusti <[email protected]> AuthorDate: Fri Sep 13 07:40:55 2024 +0200 fix(ctrl): resume from unknown If an IntegrationPlatform is deleted, when the Integration requires monitoring it fails --- e2e/advanced/operator_restart_test.go | 81 +++++++++++++++++ e2e/advanced/operator_resume_unknown_test.go | 93 ++++++++++++++++++++ e2e/support/test_support.go | 29 ++++++- pkg/controller/integration/monitor.go | 8 +- pkg/controller/integration/monitor_test.go | 120 ++++++++++++++++++++++++++ pkg/controller/integration/monitor_unknown.go | 6 ++ 6 files changed, 335 insertions(+), 2 deletions(-) diff --git a/e2e/advanced/operator_restart_test.go b/e2e/advanced/operator_restart_test.go new file mode 100644 index 000000000..0f84972d5 --- /dev/null +++ b/e2e/advanced/operator_restart_test.go @@ -0,0 +1,81 @@ +//go:build integration +// +build integration + +// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration" + +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package advanced + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + + . "github.com/apache/camel-k/v2/e2e/support" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" +) + +func TestOperatorRestart(t *testing.T) { + t.Parallel() + + WithNewTestNamespace(t, func(ctx context.Context, g *WithT, ns string) { + name := RandomizedSuffixName("yaml") + + t.Run("Operator started", func(t *testing.T) { + InstallOperator(t, ctx, g, ns) + g.Eventually(OperatorPod(t, ctx, ns)).Should(Not(BeNil())) + g.Eventually(PlatformPhase(t, ctx, ns), TestTimeoutShort).Should(Equal(v1.IntegrationPlatformPhaseReady)) + g.Expect(KamelRun(t, ctx, ns, "files/yaml.yaml", "--name", name).Execute()).To(Succeed()) + g.Eventually(IntegrationPhase(t, ctx, ns, name), TestTimeoutMedium).Should(Equal(v1.IntegrationPhaseRunning)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationPodPhase(t, ctx, ns, name), TestTimeoutShort).Should(Equal(corev1.PodRunning)) + g.Eventually(IntegrationLogs(t, ctx, ns, name), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) + g.Eventually(Kit(t, ctx, ns, IntegrationKit(t, ctx, ns, name)())).Should(Not(BeNil())) + g.Eventually(Integration(t, ctx, ns, name)).Should(Not(BeNil())) + }) + + t.Run("Operator uninstalled", func(t *testing.T) { + UninstallOperator(t, ctx, g, ns, "../../") + g.Eventually(OperatorPod(t, ctx, ns)).Should(BeNil()) + g.Eventually(Platform(t, ctx, ns)).Should(BeNil()) + g.Eventually(IntegrationPhase(t, ctx, ns, name), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseRunning)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationPodPhase(t, ctx, ns, name), TestTimeoutShort).Should(Equal(corev1.PodRunning)) + g.Eventually(IntegrationLogs(t, ctx, ns, name), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) + g.Eventually(Kit(t, ctx, ns, IntegrationKit(t, ctx, ns, name)())).Should(Not(BeNil())) + g.Eventually(Integration(t, ctx, ns, name)).Should(Not(BeNil())) + }) + + t.Run("Operator reinstalled", func(t *testing.T) { + InstallOperator(t, ctx, g, ns) + g.Eventually(OperatorPod(t, ctx, ns)).Should(Not(BeNil())) + g.Eventually(PlatformPhase(t, ctx, ns), TestTimeoutShort).Should(Equal(v1.IntegrationPlatformPhaseReady)) + g.Consistently(OperatorLogs(t, ctx, ns), 1*time.Minute, 3*time.Second).Should(Not(ContainSubstring("error"))) + g.Eventually(IntegrationPhase(t, ctx, ns, name), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseRunning)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationPodPhase(t, ctx, ns, name), TestTimeoutShort).Should(Equal(corev1.PodRunning)) + g.Eventually(IntegrationLogs(t, ctx, ns, name), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) + g.Eventually(Kit(t, ctx, ns, IntegrationKit(t, ctx, ns, name)())).Should(Not(BeNil())) + g.Eventually(Integration(t, ctx, ns, name)).Should(Not(BeNil())) + }) + }) +} diff --git a/e2e/advanced/operator_resume_unknown_test.go b/e2e/advanced/operator_resume_unknown_test.go new file mode 100644 index 000000000..61c4af663 --- /dev/null +++ b/e2e/advanced/operator_resume_unknown_test.go @@ -0,0 +1,93 @@ +//go:build integration +// +build integration + +// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration" + +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package advanced + +import ( + "context" + "os" + "testing" + "time" + + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + . "github.com/apache/camel-k/v2/e2e/support" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" +) + +func TestOperatorResumeFromUnknown(t *testing.T) { + t.Parallel() + + WithNewTestNamespace(t, func(ctx context.Context, g *WithT, ns string) { + name := RandomizedSuffixName("yaml") + containerRegistry, ok := os.LookupEnv("KAMEL_INSTALL_REGISTRY") + g.Expect(ok).To(BeTrue(), "You need to provide the registry as KAMEL_INSTALL_REGISTRY env var") + + InstallOperator(t, ctx, g, ns) + g.Eventually(OperatorPod(t, ctx, ns)).Should(Not(BeNil())) + g.Eventually(PlatformPhase(t, ctx, ns), TestTimeoutShort).Should(Equal(v1.IntegrationPlatformPhaseReady)) + g.Expect(KamelRun(t, ctx, ns, "files/yaml.yaml", "--name", name).Execute()).To(Succeed()) + g.Eventually(IntegrationPhase(t, ctx, ns, name), TestTimeoutMedium).Should(Equal(v1.IntegrationPhaseRunning)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionPlatformAvailable), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationPodPhase(t, ctx, ns, name), TestTimeoutShort).Should(Equal(corev1.PodRunning)) + g.Eventually(IntegrationLogs(t, ctx, ns, name), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) + + // Delete the IntegrationPlatform: as soon as there is a "monitoring" operation, the Integration + // should go in Unknown status as it cannot create traits, therefore, it would fail + g.Expect(DeletePlatform(t, ctx, ns)()).To(BeTrue()) + g.Consistently(IntegrationPhase(t, ctx, ns, name), 1*time.Minute, 5*time.Second).Should(Equal(v1.IntegrationPhaseRunning)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + // Asking for a scale operation triggers a monitoring action + g.Expect(ScaleIntegration(t, ctx, ns, name, 2)).To(Succeed()) + g.Eventually(IntegrationPhase(t, ctx, ns, name), TestTimeoutMedium).Should(Equal(v1.IntegrationPhaseUnknown)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionPlatformAvailable), TestTimeoutShort).Should(Equal(corev1.ConditionFalse)) + // Fix the platform (create a new one) + platform := &v1.IntegrationPlatform{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationPlatformKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: "camel-k", + }, + Spec: v1.IntegrationPlatformSpec{ + Build: v1.IntegrationPlatformBuildSpec{ + Registry: v1.RegistrySpec{ + Address: containerRegistry, + }, + }, + }, + } + g.Expect(CreateIntegrationPlatform(t, ctx, platform)).To(Succeed()) + g.Eventually(PlatformPhase(t, ctx, ns), TestTimeoutShort).Should(Equal(v1.IntegrationPlatformPhaseReady)) + // The monitoring should now start correctly + g.Eventually(IntegrationPhase(t, ctx, ns, name), TestTimeoutMedium).Should(Equal(v1.IntegrationPhaseRunning)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionPlatformAvailable), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationPods(t, ctx, ns, name), TestTimeoutShort).Should(HaveLen(2)) + }) +} diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go index 565e21a49..a1fceebaf 100644 --- a/e2e/support/test_support.go +++ b/e2e/support/test_support.go @@ -439,6 +439,32 @@ func IntegrationLogs(t *testing.T, ctx context.Context, ns, name string) func() } } +func OperatorLogs(t *testing.T, ctx context.Context, ns string) func() string { + return func() string { + pod := OperatorPod(t, ctx, ns)() + if pod == nil { + return "" + } + + options := corev1.PodLogOptions{ + TailLines: ptr.To(int64(200)), + } + + for _, container := range pod.Status.ContainerStatuses { + if !container.Ready || container.State.Waiting != nil { + // avoid logs watch fail due to container creating state + return "" + } + } + + if len(pod.Spec.Containers) > 1 { + options.Container = pod.Spec.Containers[0].Name + } + + return Logs(t, ctx, ns, pod.Name, options)() + } +} + // TailedLogs Retrieve the Logs from the Pod defined by its name in the given namespace ns. The number of lines numLines from the end of the logs to show. func TailedLogs(t *testing.T, ctx context.Context, ns, name string, numLines int64) func() string { return func() string { @@ -2085,8 +2111,9 @@ func DeletePlatform(t *testing.T, ctx context.Context, ns string) func() bool { } if err := TestClient(t).Delete(ctx, pl); err != nil { log.Error(err, "Got error while deleting the platform") + return false } - return false + return true } } diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index 5c395a86d..8759af19e 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -140,8 +140,14 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra // If the platform is not in ready status (it may happen when a new IntegrationPlatform is created), then, we may not be able to // properly apply all the traits. We must set the phase in an unknown status which should be periodically reconciled in order to make sure that // we eventually return in a ready phase (likely once the platform is done) - if environment.Platform != nil && environment.Platform.Status.Phase != v1.IntegrationPlatformPhaseReady { + if environment.Platform == nil || environment.Platform.Status.Phase != v1.IntegrationPlatformPhaseReady { integration.Status.Phase = v1.IntegrationPhaseUnknown + integration.Status.SetCondition( + v1.IntegrationConditionPlatformAvailable, + corev1.ConditionFalse, + "PlatformMissing", + "IntegrationPlatform is missing or not yet ready. If the problem persist, make sure to fix the IntegrationPlatform error or create a new one.", + ) return integration, nil } action.checkTraitAnnotationsDeprecatedNotice(integration) diff --git a/pkg/controller/integration/monitor_test.go b/pkg/controller/integration/monitor_test.go index dbd56507a..1f430037d 100644 --- a/pkg/controller/integration/monitor_test.go +++ b/pkg/controller/integration/monitor_test.go @@ -244,11 +244,129 @@ func TestMonitorIntegrationWhilePlatformRecreating(t *testing.T) { handledIt, err := a.Handle(context.TODO(), it) require.NoError(t, err) assert.Equal(t, v1.IntegrationPhaseUnknown, handledIt.Status.Phase) + condition := handledIt.Status.GetCondition(v1.IntegrationConditionPlatformAvailable) + assert.Equal(t, corev1.ConditionFalse, condition.Status) + assert.Equal(t, "PlatformMissing", condition.Reason) + assert.Equal(t, "IntegrationPlatform is missing or not yet ready. If the problem persist, "+ + "make sure to fix the IntegrationPlatform error or create a new one.", condition.Message) +} + +func TestMonitorIntegrationPlatformNil(t *testing.T) { + catalog := &v1.CamelCatalog{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.CamelCatalogKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "camel-k-catalog", + }, + Spec: v1.CamelCatalogSpec{ + Runtime: v1.RuntimeSpec{ + Provider: v1.RuntimeProviderQuarkus, + Version: defaults.DefaultRuntimeVersion, + }, + }, + } + kit := &v1.IntegrationKit{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationKitKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-kit", + }, + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + } + it := &v1.Integration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-it", + }, + Status: v1.IntegrationStatus{ + RuntimeVersion: defaults.DefaultRuntimeVersion, + Phase: v1.IntegrationPhaseRunning, + IntegrationKit: &corev1.ObjectReference{ + Name: kit.Name, + Namespace: kit.Namespace, + Kind: kit.Kind, + APIVersion: kit.APIVersion, + }, + Conditions: []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionDeploymentAvailable, + Status: corev1.ConditionTrue, + }, + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + hash, _ := digest.ComputeForIntegration(it, nil, nil) + it.Status.Digest = hash + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-pod", + Labels: map[string]string{ + v1.IntegrationLabel: "my-it", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-cnt", + Image: "my-img", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + c, err := test.NewFakeClient(catalog, it, kit, pod) + require.NoError(t, err) + + a := monitorAction{} + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "monitor", a.Name()) + assert.True(t, a.CanHandle(it)) + handledIt, err := a.Handle(context.TODO(), it) + require.NoError(t, err) + assert.Equal(t, v1.IntegrationPhaseUnknown, handledIt.Status.Phase) + condition := handledIt.Status.GetCondition(v1.IntegrationConditionPlatformAvailable) + assert.Equal(t, corev1.ConditionFalse, condition.Status) + assert.Equal(t, "PlatformMissing", condition.Reason) + assert.Equal(t, "IntegrationPlatform is missing or not yet ready. If the problem persist, "+ + "make sure to fix the IntegrationPlatform error or create a new one.", condition.Message) } func TestMonitorIntegrationRecoverFromUnknown(t *testing.T) { c, it, err := nominalEnvironment() it.Status.Phase = v1.IntegrationPhaseUnknown + it.Status.SetCondition( + v1.IntegrationConditionPlatformAvailable, corev1.ConditionFalse, "PlatformMissing", + "IntegrationPlatform is missing or not yet ready. If the problem persist, make sure to fix the IntegrationPlatform error or create a new one.") require.NoError(t, err) a := monitorUnknownAction{} @@ -259,6 +377,8 @@ func TestMonitorIntegrationRecoverFromUnknown(t *testing.T) { handledIt, err := a.Handle(context.TODO(), it) require.NoError(t, err) assert.Equal(t, v1.IntegrationPhaseRunning, handledIt.Status.Phase) + condition := handledIt.Status.GetCondition(v1.IntegrationConditionPlatformAvailable) + assert.Equal(t, corev1.ConditionTrue, condition.Status) } func nominalEnvironment() (client.Client, *v1.Integration, error) { diff --git a/pkg/controller/integration/monitor_unknown.go b/pkg/controller/integration/monitor_unknown.go index e9e1cbc30..98c966ace 100644 --- a/pkg/controller/integration/monitor_unknown.go +++ b/pkg/controller/integration/monitor_unknown.go @@ -54,6 +54,12 @@ func (action *monitorUnknownAction) Handle(ctx context.Context, integration *v1. // We're good to monitor this again if environment.Platform != nil && environment.Platform.Status.Phase == v1.IntegrationPlatformPhaseReady { integration.Status.Phase = v1.IntegrationPhaseRunning + integration.Status.SetCondition( + v1.IntegrationConditionPlatformAvailable, + corev1.ConditionTrue, + v1.IntegrationConditionPlatformAvailableReason, + environment.Platform.Namespace+"/"+environment.Platform.Name, + ) return integration, nil }
