This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new c7a44ce5 [YUNIKORN-2864] Add e2e test for InPlacePodVerticalScaling
feature (#965)
c7a44ce5 is described below
commit c7a44ce535994f83e0efe2af9ab88be04e5ecfdd
Author: kaichiachen <[email protected]>
AuthorDate: Thu Apr 10 15:44:57 2025 +0200
[YUNIKORN-2864] Add e2e test for InPlacePodVerticalScaling feature (#965)
Closes: #965
Signed-off-by: Peter Bacsko <[email protected]>
---
test/e2e/framework/helpers/k8s/k8s_utils.go | 44 ++++
.../pod_resource_scaling_suite_test.go | 79 ++++++++
.../pod_resource_scaling_test.go | 222 +++++++++++++++++++++
3 files changed, 345 insertions(+)
diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go
b/test/e2e/framework/helpers/k8s/k8s_utils.go
index 985ef93b..e63f7ad7 100644
--- a/test/e2e/framework/helpers/k8s/k8s_utils.go
+++ b/test/e2e/framework/helpers/k8s/k8s_utils.go
@@ -18,12 +18,14 @@ package k8s
import (
"context"
+ "encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"os"
"path/filepath"
+ "strconv"
"strings"
"time"
@@ -41,8 +43,10 @@ import (
storagev1 "k8s.io/api/storage/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/apimachinery/pkg/version"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
@@ -152,6 +156,15 @@ func (k *KubeCtl) GetClient() *kubernetes.Clientset {
return k.clientSet
}
+// GetKubernetesVersion returns the version info from the Kubernetes server
+func (k *KubeCtl) GetKubernetesVersion() (*version.Info, error) {
+ k8sVer, err := k.clientSet.Discovery().ServerVersion()
+ if err != nil {
+ return k8sVer, err
+ }
+ return k8sVer, nil
+}
+
func (k *KubeCtl) GetKubeConfig() (*rest.Config, error) {
if k.kubeConfig != nil {
return k.kubeConfig, nil
@@ -221,6 +234,37 @@ func (k *KubeCtl) UpdatePod(pod *v1.Pod, namespace string)
(*v1.Pod, error) {
return k.clientSet.CoreV1().Pods(namespace).Update(context.TODO(), pod,
metav1.UpdateOptions{})
}
+func (k *KubeCtl) PatchPod(pod *v1.Pod, namespace string, patch
[]map[string]interface{}, subresources ...string) (*v1.Pod, error) {
+ patchBytes, err := json.Marshal(patch)
+ if err != nil {
+ return nil, err
+ }
+ return k.clientSet.CoreV1().Pods(namespace).Patch(
+ context.TODO(),
+ pod.Name,
+ types.JSONPatchType,
+ patchBytes,
+ metav1.PatchOptions{},
+ subresources...,
+ )
+}
+
+func (k *KubeCtl) ModifyResourceUsage(pod *v1.Pod, namespace string, newVcore
int64, newMemory int64) (*v1.Pod, error) {
+ patch := []map[string]interface{}{
+ {
+ "op": "replace",
+ "path": "/spec/containers/0/resources/requests/cpu",
+ "value": strconv.FormatInt(newVcore, 10) + "m",
+ },
+ {
+ "op": "replace",
+ "path": "/spec/containers/0/resources/requests/memory",
+ "value": strconv.FormatInt(newMemory, 10) + "Mi",
+ },
+ }
+ return k.PatchPod(pod, namespace, patch, "resize")
+}
+
func (k *KubeCtl) DeletePodAnnotation(pod *v1.Pod, namespace, annotation
string) (*v1.Pod, error) {
annotations := pod.Annotations
delete(annotations, annotation)
diff --git a/test/e2e/pod_resource_scaling/pod_resource_scaling_suite_test.go
b/test/e2e/pod_resource_scaling/pod_resource_scaling_suite_test.go
new file mode 100644
index 00000000..0049eb75
--- /dev/null
+++ b/test/e2e/pod_resource_scaling/pod_resource_scaling_suite_test.go
@@ -0,0 +1,79 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package pod_resource_scaling
+
+import (
+ "path/filepath"
+ "runtime"
+ "testing"
+
+ "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/ginkgo/v2/reporters"
+ "github.com/onsi/gomega"
+
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
+)
+
+func init() {
+ configmanager.YuniKornTestConfig.ParseFlags()
+}
+
+func TestPodResourceScaling(t *testing.T) {
+ ginkgo.ReportAfterSuite("TestPodResourceScaling", func(report
ginkgo.Report) {
+ err := common.CreateJUnitReportDir()
+ Ω(err).NotTo(gomega.HaveOccurred())
+ err = reporters.GenerateJUnitReportWithConfig(
+ report,
+ filepath.Join(configmanager.YuniKornTestConfig.LogDir,
"TEST-pod_resource_scaling_junit.xml"),
+ reporters.JunitReportConfig{OmitSpecLabels: true},
+ )
+ Ω(err).NotTo(HaveOccurred())
+ })
+ gomega.RegisterFailHandler(ginkgo.Fail)
+ ginkgo.RunSpecs(t, "Pod Resource Scaling Suite")
+}
+
+var _ = ginkgo.BeforeSuite(func() {
+ _, filename, _, _ := runtime.Caller(0)
+ suiteName = common.GetSuiteName(filename)
+
+ // Initializing kubectl client
+ kClient = k8s.KubeCtl{}
+ Ω(kClient.SetClient()).To(gomega.BeNil())
+
+ // Initializing rest client
+ restClient = yunikorn.RClient{}
+ Ω(restClient).NotTo(gomega.BeNil())
+ yunikorn.EnsureYuniKornConfigsPresent()
+ yunikorn.UpdateConfigMapWrapper(oldConfigMap, "")
+})
+
+var _ = ginkgo.AfterSuite(func() {
+ yunikorn.RestoreConfigMapWrapper(oldConfigMap)
+})
+
+var Ω = gomega.Ω
+var HaveOccurred = gomega.HaveOccurred
+var Equal = gomega.Equal
+var By = ginkgo.By
+var BeforeEach = ginkgo.BeforeEach
+var AfterEach = ginkgo.AfterEach
diff --git a/test/e2e/pod_resource_scaling/pod_resource_scaling_test.go
b/test/e2e/pod_resource_scaling/pod_resource_scaling_test.go
new file mode 100644
index 00000000..95d0fd53
--- /dev/null
+++ b/test/e2e/pod_resource_scaling/pod_resource_scaling_test.go
@@ -0,0 +1,222 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package pod_resource_scaling
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/onsi/ginkgo/v2"
+ v1 "k8s.io/api/core/v1"
+
+ "github.com/apache/yunikorn-k8shim/pkg/common/utils"
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
+)
+
+var kClient k8s.KubeCtl
+var restClient yunikorn.RClient
+var err error
+var ns string
+var oldConfigMap = new(v1.ConfigMap)
+var suiteName string
+
+var _ = BeforeEach(func() {
+ // Skip if K8s version < 1.32
+ k8sVer, err := kClient.GetKubernetesVersion()
+ Ω(err).NotTo(HaveOccurred())
+ if k8sVer.Major < "1" || (k8sVer.Major == "1" && k8sVer.Minor < "32") {
+ ginkgo.Skip("InPlacePodVerticalScaling requires K8s 1.32+")
+ }
+
+ // Create test namespace
+ ns = "test-" + common.RandSeq(10)
+ _, err = kClient.CreateNamespace(ns, nil)
+ Ω(err).NotTo(HaveOccurred())
+})
+
+var _ = ginkgo.AfterEach(func() {
+ By("Killing all pods")
+ err := kClient.DeletePods(ns)
+ Ω(err).NotTo(HaveOccurred())
+ err = kClient.DeleteNamespace(ns)
+ Ω(err).NotTo(HaveOccurred())
+})
+
+func verifyYunikornResourceUsage(appID, resourceName string, value int64) {
+ err = utils.WaitForCondition(func() bool {
+ app, err := restClient.GetAppInfo("default", "root."+ns, appID)
+ if err != nil || app == nil {
+ fmt.Println(err)
+ return false
+ }
+
+ if app.Allocations == nil {
+ fmt.Println(app)
+ return false
+ }
+
+ for _, alloc := range app.Allocations {
+ resVal, exists := alloc.ResourcePerAlloc[resourceName]
+ if !exists {
+ return false
+ }
+
+ if resVal == value {
+ return true
+ }
+ }
+
+ return false
+ }, 30*time.Second, 120*time.Second)
+ Ω(err).NotTo(HaveOccurred(), fmt.Sprintf("Pod should be scheduled by
YuniKorn with correct resource(%s) allocation", resourceName))
+}
+
+var _ = ginkgo.Describe("InPlacePodVerticalScaling", func() {
+ ginkgo.It("Pod resources(cpu/memory) resize up", func() {
+ // Create pod with initial resources
+ sleepPodConfigs := k8s.SleepPodConfig{NS: ns, Time: 600, CPU:
100, Mem: 100, QOSClass: v1.PodQOSGuaranteed}
+ pod, err := k8s.InitSleepPod(sleepPodConfigs)
+ Ω(err).NotTo(HaveOccurred())
+
+ // Create pod
+ pod, err = kClient.CreatePod(pod, ns)
+ Ω(err).NotTo(HaveOccurred())
+
+ // Wait for pod running
+ err = kClient.WaitForPodRunning(ns, pod.Name, 60*time.Second)
+ Ω(err).NotTo(HaveOccurred())
+
+ // Check if pod is scheduled by YuniKorn and verify CPU
allocation is 100m
+
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore",
100)
+
+ // Get initial pod restart count
+ pod, err = kClient.GetPod(pod.Name, ns)
+ Ω(err).NotTo(HaveOccurred())
+ initialRestartCount :=
pod.Status.ContainerStatuses[0].RestartCount
+
+ pod, err = kClient.ModifyResourceUsage(pod, ns, 200, 100)
+ Ω(err).NotTo(HaveOccurred())
+
+
Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount),
"Container should not have restarted")
+
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore",
200)
+
+ pod, err = kClient.ModifyResourceUsage(pod, ns, 200, 200)
+ Ω(err).NotTo(HaveOccurred())
+
+
Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount),
"Container should not have restarted")
+
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "memory",
200*1024*1024)
+ })
+
+ ginkgo.It("Pod resources(cpu/memory) resize down", func() {
+ // Create pod with initial resources
+ sleepPodConfigs := k8s.SleepPodConfig{NS: ns, Time: 600, CPU:
200, Mem: 200, QOSClass: v1.PodQOSGuaranteed}
+ pod, err := k8s.InitSleepPod(sleepPodConfigs)
+ Ω(err).NotTo(HaveOccurred())
+
+ // Create pod
+ pod, err = kClient.CreatePod(pod, ns)
+ Ω(err).NotTo(HaveOccurred())
+
+ // Wait for pod running
+ err = kClient.WaitForPodRunning(ns, pod.Name, 60*time.Second)
+ Ω(err).NotTo(HaveOccurred())
+
+ // Check if pod is scheduled by YuniKorn and verify CPU
allocation is 100m
+
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore",
200)
+
+ // Get initial pod state
+ pod, err = kClient.GetPod(pod.Name, ns)
+ Ω(err).NotTo(HaveOccurred())
+ initialStartTime := pod.Status.StartTime
+ initialRestartCount :=
pod.Status.ContainerStatuses[0].RestartCount
+
+ pod, err = kClient.ModifyResourceUsage(pod, ns, 100, 200)
+ Ω(err).NotTo(HaveOccurred())
+
+ // Wait for resource update to be reflected
+ err = utils.WaitForCondition(func() bool {
+ currentPod, err := kClient.GetPod(pod.Name, ns)
+ if err != nil {
+ return false
+ }
+ return
currentPod.Spec.Containers[0].Resources.Requests.Cpu().MilliValue() ==
int64(100)
+ }, 10*time.Second, 120*time.Second)
+ Ω(err).NotTo(HaveOccurred())
+
+ Ω(err).NotTo(HaveOccurred())
+ Ω(pod.Status.StartTime).To(Equal(initialStartTime), "Pod should
not have restarted")
+
Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount),
"Container should not have restarted")
+
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore",
100)
+
+ pod, err = kClient.ModifyResourceUsage(pod, ns, 100, 100)
+ Ω(err).NotTo(HaveOccurred()) // Expect an error as memory
cannot be decreased
+
+ Ω(err).NotTo(HaveOccurred())
+ Ω(pod.Status.StartTime).To(Equal(initialStartTime), "Pod should
not have restarted")
+
Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount),
"Container should not have restarted")
+
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "memory",
100*1024*1024)
+ })
+
+ ginkgo.It("Pod resources(cpu/memory) resize to excessive values should
fail", func() {
+ // Create pod with initial resources
+ sleepPodConfigs := k8s.SleepPodConfig{NS: ns, Time: 600, CPU:
100, Mem: 100, QOSClass: v1.PodQOSGuaranteed}
+ pod, err := k8s.InitSleepPod(sleepPodConfigs)
+ Ω(err).NotTo(HaveOccurred())
+
+ // Create pod
+ _, err = kClient.CreatePod(pod, ns)
+ Ω(err).NotTo(HaveOccurred())
+
+ // Wait for pod running
+ err = kClient.WaitForPodRunning(ns, pod.Name, 60*time.Second)
+ Ω(err).NotTo(HaveOccurred())
+
+ // Check if pod is scheduled by YuniKorn and verify CPU
allocation is 100m
+
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore",
100)
+
+ // Get initial pod state
+ pod, err = kClient.GetPod(pod.Name, ns)
+ Ω(err).NotTo(HaveOccurred())
+ initialStartTime := pod.Status.StartTime
+ initialRestartCount :=
pod.Status.ContainerStatuses[0].RestartCount
+
+ // Patch CPU/Memory to an excessive value
+ pod, err = kClient.ModifyResourceUsage(pod, ns, 100000, 100000)
+ Ω(err).NotTo(HaveOccurred())
+
+ // Wait for resource update to be reflected
+ err = utils.WaitForCondition(func() bool {
+ currentPod, err := kClient.GetPod(pod.Name, ns)
+ if err != nil {
+ return false
+ }
+ return currentPod.Status.Resize ==
v1.PodResizeStatusInfeasible
+ }, 10*time.Second, 120*time.Second)
+ Ω(err).NotTo(HaveOccurred())
+
+ Ω(err).NotTo(HaveOccurred())
+ Ω(pod.Status.StartTime).To(Equal(initialStartTime), "Pod should
not have restarted")
+
Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount),
"Container should not have restarted")
+
+ // Verify pod resource usage is unchanged after set an
excessive value
+
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore",
100)
+ })
+})
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]