This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 352e1b75 [YUNIKORN-1977] Added E2E test for deploying pod and
verifying user info with non-kube-admin user (#915)
352e1b75 is described below
commit 352e1b75da599510b0cb8387d58681fc69a3a830
Author: rrajesh <[email protected]>
AuthorDate: Fri Oct 25 11:47:45 2024 +0200
[YUNIKORN-1977] Added E2E test for deploying pod and verifying user info
with non-kube-admin user (#915)
Closes: #915
Signed-off-by: Peter Bacsko <[email protected]>
---
test/e2e/framework/helpers/k8s/k8s_utils.go | 110 +++++++---
test/e2e/user_group_limit/user_group_limit_test.go | 224 ++++++++++++++++++++-
2 files changed, 300 insertions(+), 34 deletions(-)
diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go
b/test/e2e/framework/helpers/k8s/k8s_utils.go
index 77a472f5..ef40becf 100644
--- a/test/e2e/framework/helpers/k8s/k8s_utils.go
+++ b/test/e2e/framework/helpers/k8s/k8s_utils.go
@@ -17,20 +17,19 @@
package k8s
import (
- "bytes"
"context"
"errors"
"fmt"
"net/http"
"net/url"
"os"
- "os/exec"
"path/filepath"
"strings"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
+ clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
@@ -1091,14 +1090,6 @@ func (k *KubeCtl) CreateSecret(secret *v1.Secret,
namespace string) (*v1.Secret,
return k.clientSet.CoreV1().Secrets(namespace).Create(context.TODO(),
secret, metav1.CreateOptions{})
}
-func GetSecretObj(yamlPath string) (*v1.Secret, error) {
- o, err := common.Yaml2Obj(yamlPath)
- if err != nil {
- return nil, err
- }
- return o.(*v1.Secret), err
-}
-
func (k *KubeCtl) CreateServiceAccount(accountName string, namespace string)
(*v1.ServiceAccount, error) {
return
k.clientSet.CoreV1().ServiceAccounts(namespace).Create(context.TODO(),
&v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{Name: accountName},
@@ -1383,21 +1374,6 @@ func (k *KubeCtl) isNumJobPodsInDesiredState(jobName
string, namespace string, n
}
}
-func ApplyYamlWithKubectl(path, namespace string) error {
- cmd := exec.Command("kubectl", "apply", "-f", path, "-n", namespace)
- var stderr bytes.Buffer
- cmd.Stderr = &stderr
- // if err != nil, isn't represent yaml format error.
- // it only represent the cmd.Run() fail.
- err := cmd.Run()
- // if yaml format error, errStr will show the detail
- errStr := stderr.String()
- if err != nil && errStr != "" {
- return fmt.Errorf("apply fail with %s", errStr)
- }
- return nil
-}
-
func (k *KubeCtl) GetNodes() (*v1.NodeList, error) {
return k.clientSet.CoreV1().Nodes().List(context.TODO(),
metav1.ListOptions{})
}
@@ -1779,3 +1755,87 @@ func (k *KubeCtl) DeleteStorageClass(scName string)
error {
}
return nil
}
+
+func (k *KubeCtl) GetSecrets(namespace string) (*v1.SecretList, error) {
+ return k.clientSet.CoreV1().Secrets(namespace).List(context.TODO(),
metav1.ListOptions{})
+}
+
+// GetSecretValue retrieves the value for a specific key from a Kubernetes
secret.
+func (k *KubeCtl) GetSecretValue(namespace, secretName, key string) (string,
error) {
+ err := k.WaitForSecret(namespace, secretName, 5*time.Second)
+ if err != nil {
+ return "", err
+ }
+ secret, err := k.GetSecret(namespace, secretName)
+ if err != nil {
+ return "", err
+ }
+ // Check if the key exists in the secret
+ value, ok := secret.Data[key]
+ if !ok {
+ return "", fmt.Errorf("key %s not found in secret %s", key,
secretName)
+ }
+ return string(value), nil
+}
+
+func (k *KubeCtl) GetSecret(namespace, secretName string) (*v1.Secret, error) {
+ secret, err :=
k.clientSet.CoreV1().Secrets(namespace).Get(context.TODO(), secretName,
metav1.GetOptions{})
+ if err != nil {
+ return nil, err
+ }
+ return secret, nil
+}
+
+func (k *KubeCtl) WaitForSecret(namespace, secretName string, timeout
time.Duration) error {
+ var cond wait.ConditionFunc // nolint:gosimple
+ cond = func() (done bool, err error) {
+ secret, err := k.GetSecret(namespace, secretName)
+ if err != nil {
+ return false, err
+ }
+ if secret != nil {
+ return true, nil
+ }
+ return false, nil
+ }
+ return wait.PollUntilContextTimeout(context.TODO(), time.Second,
timeout, false, cond.WithContext())
+}
+
+func WriteConfigToFile(config *rest.Config, kubeconfigPath string) error {
+ // Build the kubeconfig API object from the rest.Config
+ kubeConfig := &clientcmdapi.Config{
+ Clusters: map[string]*clientcmdapi.Cluster{
+ "default-cluster": {
+ Server: config.Host,
+ CertificateAuthorityData: config.CAData,
+ InsecureSkipTLSVerify: config.Insecure,
+ },
+ },
+ AuthInfos: map[string]*clientcmdapi.AuthInfo{
+ "default-auth": {
+ Token: config.BearerToken,
+ },
+ },
+ Contexts: map[string]*clientcmdapi.Context{
+ "default-context": {
+ Cluster: "default-cluster",
+ AuthInfo: "default-auth",
+ },
+ },
+ CurrentContext: "default-context",
+ }
+
+ // Ensure the directory where the file is being written exists
+ err := os.MkdirAll(filepath.Dir(kubeconfigPath), os.ModePerm)
+ if err != nil {
+ return fmt.Errorf("failed to create directory for kubeconfig
file: %v", err)
+ }
+
+ // Write the kubeconfig to the specified file
+ err = clientcmd.WriteToFile(*kubeConfig, kubeconfigPath)
+ if err != nil {
+ return fmt.Errorf("failed to write kubeconfig to file: %v", err)
+ }
+
+ return nil
+}
diff --git a/test/e2e/user_group_limit/user_group_limit_test.go
b/test/e2e/user_group_limit/user_group_limit_test.go
index 7eb3d016..45ba70c1 100644
--- a/test/e2e/user_group_limit/user_group_limit_test.go
+++ b/test/e2e/user_group_limit/user_group_limit_test.go
@@ -19,31 +19,45 @@
package user_group_limit_test
import (
+ "context"
"encoding/json"
"fmt"
"net/url"
+ "os"
+ "path/filepath"
"runtime"
+ "strings"
"time"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
+
amCommon "github.com/apache/yunikorn-k8shim/pkg/admission/common"
amconf "github.com/apache/yunikorn-k8shim/pkg/admission/conf"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
+
tests "github.com/apache/yunikorn-k8shim/test/e2e"
+
"github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
+
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/clientcmd"
+
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
+ rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ "k8s.io/client-go/kubernetes"
)
type TestType int
@@ -103,8 +117,11 @@ var _ = ginkgo.BeforeEach(func() {
var _ = ginkgo.AfterSuite(func() {
ginkgo.By("Check Yunikorn's health")
checks, err := yunikorn.GetFailedHealthChecks()
- Ω(err).NotTo(gomega.HaveOccurred())
+ Ω(err).NotTo(HaveOccurred())
Ω(checks).To(gomega.Equal(""), checks)
+ ginkgo.By("Tearing down namespace: " + dev)
+ err = kClient.TearDownNamespace(dev)
+ Ω(err).NotTo(HaveOccurred())
})
var _ = ginkgo.Describe("UserGroupLimit", func() {
@@ -909,40 +926,229 @@ var _ = ginkgo.Describe("UserGroupLimit", func() {
checkUsageWildcardGroups(groupTestType, group2, sandboxQueue1,
[]*v1.Pod{group2Sandbox1Pod1, group2Sandbox1Pod2, group2Sandbox1Pod3})
})
+ ginkgo.It("Verify User info for the non kube admin user", func() {
+ var clientset *kubernetes.Clientset
+ var namespace = "default"
+ var serviceAccountName = "test-user-sa"
+ var podName = "test-pod"
+ var secretName = "test-user-sa-token" // #nosec G101
+
+ ginkgo.By("Update config")
+ // The wait wrapper still can't fully guarantee that the config
in AdmissionController has been updated.
+ admissionCustomConfig = map[string]string{
+ "log.core.scheduler.ugm.level": "debug",
+ amconf.AMAccessControlBypassAuth: constants.False,
+ }
+
yunikorn.WaitForAdmissionControllerRefreshConfAfterAction(func() {
+
yunikorn.UpdateCustomConfigMapWrapperWithMap(oldConfigMap, "",
admissionCustomConfig, func(sc *configs.SchedulerConfig) error {
+ // remove placement rules so we can control
queue
+ sc.Partitions[0].PlacementRules = nil
+ err := common.AddQueue(sc,
constants.DefaultPartition, constants.RootQueue, configs.QueueConfig{
+ Name: "default",
+ Limits: []configs.Limit{
+ {
+ Limit: "user
entry",
+ Users:
[]string{user1},
+ MaxApplications: 1,
+ MaxResources:
map[string]string{
+
siCommon.Memory: fmt.Sprintf("%dM", mediumMem),
+ },
+ },
+ {
+ Limit: "user2
entry",
+ Users:
[]string{user2},
+ MaxApplications: 2,
+ MaxResources:
map[string]string{
+
siCommon.Memory: fmt.Sprintf("%dM", largeMem),
+ },
+ },
+ }})
+ if err != nil {
+ return err
+ }
+ return common.AddQueue(sc,
constants.DefaultPartition, constants.RootQueue, configs.QueueConfig{Name:
"sandbox2"})
+ })
+ })
+ defer func() {
+ // cleanup
+ ginkgo.By("Cleaning up resources...")
+ err :=
clientset.CoreV1().Pods(namespace).Delete(context.TODO(), podName,
metav1.DeleteOptions{})
+ gomega.Ω(err).NotTo(HaveOccurred())
+ err =
clientset.CoreV1().ServiceAccounts(namespace).Delete(context.TODO(),
serviceAccountName, metav1.DeleteOptions{})
+ gomega.Ω(err).NotTo(HaveOccurred())
+ err = kClient.DeleteClusterRole("pod-creator-role")
+ gomega.Ω(err).NotTo(HaveOccurred())
+ err =
kClient.DeleteClusterRoleBindings("pod-creator-role-binding")
+ gomega.Ω(err).NotTo(HaveOccurred())
+ }()
+ // Create Service Account
+ ginkgo.By("Creating Service Account...")
+ sa, err := kClient.CreateServiceAccount(serviceAccountName,
namespace)
+ gomega.Ω(err).NotTo(HaveOccurred())
+ // Create a ClusterRole with necessary permissions
+ ginkgo.By("Creating ClusterRole...")
+ clusterRole := &rbacv1.ClusterRole{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "pod-creator-role",
+ },
+ Rules: []rbacv1.PolicyRule{
+ {
+ APIGroups: []string{""},
+ Resources: []string{"pods",
"serviceaccounts", "test-user-sa"},
+ Verbs: []string{"create", "get",
"list", "watch", "delete"},
+ },
+ },
+ }
+ _, err = kClient.CreateClusterRole(clusterRole)
+ gomega.Ω(err).NotTo(HaveOccurred())
+ // Create a ClusterRoleBinding to bind the ClusterRole to the
service account
+ ginkgo.By("Creating ClusterRoleBinding...")
+ clusterRoleBinding := &rbacv1.ClusterRoleBinding{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "pod-creator-role-binding",
+ },
+ RoleRef: rbacv1.RoleRef{
+ APIGroup: "rbac.authorization.k8s.io",
+ Kind: "ClusterRole",
+ Name: "pod-creator-role",
+ },
+ Subjects: []rbacv1.Subject{
+ {
+ Kind: "ServiceAccount",
+ Name: sa.Name,
+ Namespace: namespace,
+ },
+ },
+ }
+ _, err =
kClient.CreateClusterRoleBinding(clusterRoleBinding.ObjectMeta.Name,
clusterRoleBinding.RoleRef.Name, clusterRoleBinding.Subjects[0].Namespace,
clusterRoleBinding.Subjects[0].Name)
+ gomega.Ω(err).NotTo(HaveOccurred())
+ // Create a Secret for the Service Account
+ ginkgo.By("Creating Secret for the Service Account...")
+ // create a object of v1.Secret
+ secret := &v1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: secretName,
+ Annotations: map[string]string{
+ "kubernetes.io/service-account.name":
serviceAccountName,
+ },
+ },
+ Type: v1.SecretTypeServiceAccountToken,
+ }
+ _, err = kClient.CreateSecret(secret, namespace)
+ gomega.Ω(err).NotTo(HaveOccurred())
+ // Get the token value from the Secret
+ ginkgo.By("Getting the token value from the Secret...")
+ userTokenValue, err := kClient.GetSecretValue(namespace,
secretName, "token")
+ gomega.Ω(err).NotTo(HaveOccurred())
+ // use deep copy not to hardcode the kubeconfig
+ config, err := kClient.GetKubeConfig()
+ gomega.Ω(err).NotTo(HaveOccurred())
+ config.BearerToken = userTokenValue
+ newConf := rest.CopyConfig(config) // copy existing config
+ // Use token-based authentication instead of client certificates
+ newConf.CAFile = ""
+ newConf.CertFile = ""
+ newConf.KeyFile = ""
+ newConf.BearerToken = userTokenValue
+ kubeconfigPath := filepath.Join(os.TempDir(),
"test-user-config")
+ err = k8s.WriteConfigToFile(newConf, kubeconfigPath)
+ gomega.Ω(err).NotTo(HaveOccurred())
+ config, err = clientcmd.BuildConfigFromFlags("", kubeconfigPath)
+ gomega.Ω(err).NotTo(HaveOccurred())
+ clientset, err = kubernetes.NewForConfig(config)
+ gomega.Ω(err).NotTo(HaveOccurred())
+ ginkgo.By("Creating Pod...")
+ pod := &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: podName,
+ Annotations: map[string]string{
+ "created-by":
fmt.Sprintf("system:serviceaccount:%s:%s", namespace, serviceAccountName),
+ "user-token": userTokenValue, // Log
the token in the annotation
+ },
+ Labels: map[string]string{"applicationId":
"test-app"},
+ },
+ Spec: v1.PodSpec{
+ ServiceAccountName: serviceAccountName,
+ Containers: []v1.Container{
+ {
+ Name: "nginx",
+ Image: "nginx",
+ Ports:
[]v1.ContainerPort{{ContainerPort: 80}},
+ },
+ },
+ },
+ }
+ _, err =
clientset.CoreV1().Pods(namespace).Create(context.TODO(), pod,
metav1.CreateOptions{})
+ gomega.Ω(err).NotTo(HaveOccurred())
+ createdPod, err :=
clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName,
metav1.GetOptions{})
+ gomega.Ω(err).NotTo(HaveOccurred())
+ ginkgo.By("Verifying User Info...")
+ userInfo, err := GetUserInfoFromPodAnnotation(createdPod)
+ gomega.Ω(err).NotTo(HaveOccurred())
+ // user info should contain the substring
"system:serviceaccount:default:test-user-sa"
+ gomega.Ω(strings.Contains(fmt.Sprintf("%v", userInfo),
"system:serviceaccount:default:test-user-sa")).To(gomega.BeTrue())
+ queueName2 := "root_22"
+ yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", func(sc
*configs.SchedulerConfig) error {
+ // remove placement rules so we can control queue
+ sc.Partitions[0].PlacementRules = nil
+ var err error
+ if err = common.AddQueue(sc, "default", "root",
configs.QueueConfig{
+ Name: queueName2,
+ Resources: configs.Resources{Guaranteed:
map[string]string{"memory": fmt.Sprintf("%dM", 200)}},
+ Properties:
map[string]string{"preemption.delay": "1s"},
+ }); err != nil {
+ return err
+ }
+ return nil
+ })
+ })
ginkgo.AfterEach(func() {
tests.DumpClusterInfoIfSpecFailed(suiteName, []string{dev})
ginkgo.By("Tearing down namespace: " + dev)
err := kClient.TearDownNamespace(dev)
- Ω(err).NotTo(gomega.HaveOccurred())
+ Ω(err).NotTo(HaveOccurred())
// reset config
ginkgo.By("Restoring YuniKorn configuration")
yunikorn.RestoreConfigMapWrapper(oldConfigMap)
})
})
+func GetUserInfoFromPodAnnotation(pod *v1.Pod) (*si.UserGroupInformation,
error) {
+ userInfo, ok := pod.Annotations[amCommon.UserInfoAnnotation]
+ if !ok {
+ return nil, fmt.Errorf("user info not found in pod annotation")
+ }
+ var userInfoObj si.UserGroupInformation
+ err := json.Unmarshal([]byte(userInfo), &userInfoObj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to unmarshal user info from pod
annotation")
+ }
+ return &userInfoObj, nil
+}
+
func deploySleepPod(usergroup *si.UserGroupInformation, queuePath string,
expectedRunning bool, reason string) *v1.Pod {
usergroupJsonBytes, err := json.Marshal(usergroup)
- Ω(err).NotTo(gomega.HaveOccurred())
+ Ω(err).NotTo(HaveOccurred())
sleepPodConfig := k8s.SleepPodConfig{NS: dev, Mem: smallMem, Labels:
map[string]string{constants.LabelQueueName: queuePath}}
sleepPodObj, err := k8s.InitSleepPod(sleepPodConfig)
- Ω(err).NotTo(gomega.HaveOccurred())
+ Ω(err).NotTo(HaveOccurred())
sleepPodObj.Annotations[amCommon.UserInfoAnnotation] =
string(usergroupJsonBytes)
ginkgo.By(fmt.Sprintf("%s deploys the sleep pod %s to queue %s",
usergroup, sleepPodObj.Name, queuePath))
sleepPod, err := kClient.CreatePod(sleepPodObj, dev)
- gomega.Ω(err).NotTo(gomega.HaveOccurred())
+ gomega.Ω(err).NotTo(HaveOccurred())
if expectedRunning {
ginkgo.By(fmt.Sprintf("The sleep pod %s can be scheduled %s",
sleepPod.Name, reason))
err = kClient.WaitForPodRunning(dev, sleepPod.Name,
60*time.Second)
- gomega.Ω(err).NotTo(gomega.HaveOccurred())
+ gomega.Ω(err).NotTo(HaveOccurred())
} else {
ginkgo.By(fmt.Sprintf("The sleep pod %s can't be scheduled %s",
sleepPod.Name, reason))
// Since Pending is the initial state of PodPhase, sleep for 5
seconds, then check whether the pod is still in Pending state.
time.Sleep(5 * time.Second)
err = kClient.WaitForPodPending(sleepPod.Namespace,
sleepPod.Name, 60*time.Second)
- gomega.Ω(err).NotTo(gomega.HaveOccurred())
+ gomega.Ω(err).NotTo(HaveOccurred())
}
return sleepPod
}
@@ -952,14 +1158,14 @@ func checkUsage(testType TestType, name string,
queuePath string, expectedRunnin
if testType == userTestType {
ginkgo.By(fmt.Sprintf("Check user resource usage for %s in
queue %s", name, queuePath))
userUsageDAOInfo, err :=
restClient.GetUserUsage(constants.DefaultPartition, name)
- Ω(err).NotTo(gomega.HaveOccurred())
+ Ω(err).NotTo(HaveOccurred())
Ω(userUsageDAOInfo).NotTo(gomega.BeNil())
rootQueueResourceUsageDAO = userUsageDAOInfo.Queues
} else if testType == groupTestType {
ginkgo.By(fmt.Sprintf("Check group resource usage for %s in
queue %s", name, queuePath))
groupUsageDAOInfo, err :=
restClient.GetGroupUsage(constants.DefaultPartition, name)
- Ω(err).NotTo(gomega.HaveOccurred())
+ Ω(err).NotTo(HaveOccurred())
Ω(groupUsageDAOInfo).NotTo(gomega.BeNil())
rootQueueResourceUsageDAO = groupUsageDAOInfo.Queues
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]