This is an automated email from the ASF dual-hosted git repository.

xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cba4e77 [ISSUE #525][operator] refine svc creations (#530)
5cba4e77 is described below

commit 5cba4e77618ebb0d7dd057ca6e23513c33d8a918
Author: advancedxy <[email protected]>
AuthorDate: Mon Feb 6 15:08:36 2023 +0800

    [ISSUE #525][operator] refine svc creations (#530)
    
    ### What changes were proposed in this pull request?
    1. generate headless svc for coordinators and pass this service to shuffle 
servers
    2. makes RPCNodePort/HTTPNodePort optional for coordinators
    3. remove service creation for shuffle servers.
    
    ### Why are the changes needed?
    This fixes #525
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    It has already been manually verified, and will add more UTs.
---
 .../uniffle/v1alpha1/remoteshuffleservice_types.go |   2 +
 .../uniffle.apache.org_remoteshuffleservices.yaml  |   2 -
 .../operator/pkg/controller/controller/rss.go      |  11 ++-
 .../pkg/controller/sync/coordinator/coordinator.go |  58 ++++++++++--
 .../sync/coordinator/coordinator_test.go           |  60 ++++++++++++
 .../controller/sync/shuffleserver/shuffleserver.go | 103 +--------------------
 .../operator/pkg/webhook/inspector/rss.go          |   6 +-
 .../operator/pkg/webhook/inspector/rss_test.go     |  61 ++++++++++++
 8 files changed, 187 insertions(+), 116 deletions(-)

diff --git 
a/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go 
b/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
index 041dec9b..adf8aab8 100644
--- 
a/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
+++ 
b/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
@@ -96,9 +96,11 @@ type CoordinatorConfig struct {
        ExcludeNodesFilePath string `json:"excludeNodesFilePath,omitempty"`
 
        // RPCNodePort defines rpc port of node port service used for 
coordinators' external access.
+       // +optional
        RPCNodePort []int32 `json:"rpcNodePort"`
 
        // HTTPNodePort defines http port of node port service used for 
coordinators' external access.
+       // +optional
        HTTPNodePort []int32 `json:"httpNodePort"`
 }
 
diff --git 
a/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
 
b/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
index ba86981d..6cf6234d 100644
--- 
a/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
+++ 
b/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
@@ -3279,9 +3279,7 @@ spec:
                     type: string
                 required:
                 - configDir
-                - httpNodePort
                 - image
-                - rpcNodePort
                 - xmxSize
                 type: object
               shuffleServer:
diff --git a/deploy/kubernetes/operator/pkg/controller/controller/rss.go 
b/deploy/kubernetes/operator/pkg/controller/controller/rss.go
index 78834845..4083e082 100644
--- a/deploy/kubernetes/operator/pkg/controller/controller/rss.go
+++ b/deploy/kubernetes/operator/pkg/controller/controller/rss.go
@@ -618,16 +618,17 @@ func (r *rssController) syncShuffleServer(rss 
*unifflev1alpha1.RemoteShuffleServ
        if rss.Status.Phase == unifflev1alpha1.RSSRunning && 
!*rss.Spec.ShuffleServer.Sync {
                return nil
        }
-       serviceAccount, services, statefulSet := 
shuffleserver.GenerateShuffleServers(rss)
+       // we don't need to generate svc for shuffle servers:
+       // shuffle servers are access directly through coordinator's shuffler 
assignments. service for shuffle server is
+       // pointless. For spark apps running in the cluster, executor 
containers could access shuffler server via container
+       // network(overlay or host network). If shuffle servers should be 
exposed to external, host network should be used
+       // and external executor should access the host node ip:port directly.
+       serviceAccount, statefulSet := shuffleserver.GenerateShuffleServers(rss)
        if err := kubeutil.SyncServiceAccount(r.kubeClient, serviceAccount); 
err != nil {
                klog.Errorf("sync SA (%v) for rss (%v) failed: %v",
                        utils.UniqueName(serviceAccount), 
utils.UniqueName(rss), err)
                return err
        }
-       if err := kubeutil.SyncServices(r.kubeClient, services); err != nil {
-               klog.Errorf("sync SVCs for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
-               return err
-       }
        if _, _, err := kubeutil.SyncStatefulSet(r.kubeClient, statefulSet, 
true); err != nil {
                klog.Errorf("sync StatefulSet for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
                return err
diff --git 
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go 
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
index 47e2d3ac..a3ff5e58 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
@@ -54,13 +54,18 @@ func GenerateCoordinators(rss 
*unifflev1alpha1.RemoteShuffleService) (
        sa := GenerateSA(rss)
        cm := GenerateCM(rss)
        count := *rss.Spec.Coordinator.Count
-       services := make([]*corev1.Service, count)
+       services := make([]*corev1.Service, 0)
        deployments := make([]*appsv1.Deployment, count)
        for i := 0; i < int(count); i++ {
-               svc := GenerateSvc(rss, i)
+               // only generate svc when nodePorts are specified
+               if len(rss.Spec.Coordinator.RPCNodePort) > 0 {
+                       svc := GenerateSvc(rss, i)
+                       services = append(services, svc)
+               }
+               headlessSvc := GenerateHeadlessSvc(rss, i)
                deploy := GenerateDeploy(rss, i)
-               services[i] = svc
                deployments[i] = deploy
+               services = append(services, headlessSvc)
        }
        return sa, cm, services, deployments
 }
@@ -95,7 +100,43 @@ func GenerateCM(rss *unifflev1alpha1.RemoteShuffleService) 
*corev1.ConfigMap {
        return cm
 }
 
-// GenerateSvc generates service used by specific coordinator.
+// GenerateHeadlessSvc generates a headless service for corresponding 
coordinator.
+func GenerateHeadlessSvc(rss *unifflev1alpha1.RemoteShuffleService, index int) 
*corev1.Service {
+       name := GenerateNameByIndex(rss, index)
+       serviceName := appendHeadless(name)
+
+       svc := &corev1.Service{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      serviceName,
+                       Namespace: rss.Namespace,
+               },
+               Spec: corev1.ServiceSpec{
+                       ClusterIP: corev1.ClusterIPNone,
+                       Selector: map[string]string{
+                               "app": name,
+                       },
+                       Ports: []corev1.ServicePort{
+                               {
+                                       Name:       "rpc",
+                                       Protocol:   corev1.ProtocolTCP,
+                                       Port:       
controllerconstants.ContainerCoordinatorRPCPort,
+                                       TargetPort: 
intstr.FromInt(int(*rss.Spec.Coordinator.RPCPort)),
+                               },
+                               {
+                                       Name:       "http",
+                                       Protocol:   corev1.ProtocolTCP,
+                                       Port:       
controllerconstants.ContainerCoordinatorHTTPPort,
+                                       TargetPort: 
intstr.FromInt(int(*rss.Spec.Coordinator.HTTPPort)),
+                               },
+                       },
+               },
+       }
+       util.AddOwnerReference(&svc.ObjectMeta, rss)
+       return svc
+}
+
+// GenerateSvc generates NodePort service used by specific coordinator. If no 
RPCNodePort/HTTPNodePort is specified,
+//   this function is skipped.
 func GenerateSvc(rss *unifflev1alpha1.RemoteShuffleService, index int) 
*corev1.Service {
        name := GenerateNameByIndex(rss, index)
        svc := &corev1.Service{
@@ -230,12 +271,17 @@ func GenerateNameByIndex(rss 
*unifflev1alpha1.RemoteShuffleService, index int) s
        return fmt.Sprintf("%v-%v-%v", constants.RSSCoordinator, rss.Name, 
index)
 }
 
+func appendHeadless(name string) string {
+       return name + "-headless"
+}
+
 // GenerateAddresses returns addresses of coordinators accessed by shuffle 
servers.
 func GenerateAddresses(rss *unifflev1alpha1.RemoteShuffleService) string {
        var names []string
        for i := 0; i < int(*rss.Spec.Coordinator.Count); i++ {
-               current := fmt.Sprintf("%v:%v", GenerateNameByIndex(rss, i),
-                       controllerconstants.ContainerShuffleServerRPCPort)
+               name := GenerateNameByIndex(rss, i)
+               serviceName := appendHeadless(name)
+               current := fmt.Sprintf("%v:%v", serviceName, 
controllerconstants.ContainerShuffleServerRPCPort)
                names = append(names, current)
        }
        return strings.Join(names, ",")
diff --git 
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
 
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
index 6c16eebc..6df2216b 100644
--- 
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
+++ 
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
@@ -24,6 +24,7 @@ import (
        "strconv"
        "testing"
 
+       "github.com/stretchr/testify/assert"
        appsv1 "k8s.io/api/apps/v1"
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/util/sets"
@@ -210,3 +211,62 @@ func TestGenerateDeploy(t *testing.T) {
                })
        }
 }
+
+// generateServiceCuntMap generates a map with service type and its 
corresponding count. The headless svc is treated
+//   differently: the service type for headless is treated as an empty service.
+func generateServiceCountMap(services []*corev1.Service) 
map[corev1.ServiceType]int {
+       result := make(map[corev1.ServiceType]int)
+       var empty corev1.ServiceType
+       for _, service := range services {
+               sType := service.Spec.Type
+               if (sType == corev1.ServiceTypeClusterIP || sType == empty) && 
service.Spec.ClusterIP == corev1.ClusterIPNone {
+                       result[empty]++
+               } else {
+                       result[service.Spec.Type]++
+               }
+       }
+       return result
+}
+
+func TestGenerateSvcForCoordinator(t *testing.T) {
+       for _, tt := range []struct {
+               name          string
+               rss           *uniffleapi.RemoteShuffleService
+               serviceCntMap map[corev1.ServiceType]int
+       }{
+               {
+                       name: "with RPCNodePort",
+                       rss:  buildRssWithLabels(),
+                       serviceCntMap: map[corev1.ServiceType]int{
+                               "":                         2, // defaults to 
headless service
+                               corev1.ServiceTypeNodePort: 2,
+                       },
+               },
+               {
+                       name: "without RPCNodePort",
+                       rss: func() *uniffleapi.RemoteShuffleService {
+                               withoutRPCNodePortRss := buildRssWithLabels()
+                               
withoutRPCNodePortRss.Spec.Coordinator.RPCNodePort = make([]int32, 0)
+                               
withoutRPCNodePortRss.Spec.Coordinator.HTTPNodePort = make([]int32, 0)
+                               return withoutRPCNodePortRss
+                       }(),
+                       serviceCntMap: map[corev1.ServiceType]int{
+                               "": 2,
+                       },
+               },
+       } {
+               t.Run(tt.name, func(t *testing.T) {
+                       assertion := assert.New(t)
+                       _, _, services, _ := GenerateCoordinators(tt.rss)
+                       result := generateServiceCountMap(services)
+                       assertion.Equal(tt.serviceCntMap, result)
+               })
+       }
+}
+
+func TestGenerateAddresses(t *testing.T) {
+       assertion := assert.New(t)
+       rss := buildRssWithLabels()
+       quorum := GenerateAddresses(rss)
+       assertion.Contains(quorum, "headless")
+}
diff --git 
a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go 
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
index e590f756..43a57be8 100644
--- 
a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
+++ 
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
@@ -26,7 +26,6 @@ import (
        appsv1 "k8s.io/api/apps/v1"
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-       "k8s.io/apimachinery/pkg/util/intstr"
        "k8s.io/apimachinery/pkg/util/sets"
        "k8s.io/utils/pointer"
 
@@ -52,18 +51,10 @@ func init() {
 }
 
 // GenerateShuffleServers generates objects related to shuffle servers.
-func GenerateShuffleServers(rss *unifflev1alpha1.RemoteShuffleService) (
-       *corev1.ServiceAccount, []*corev1.Service, *appsv1.StatefulSet) {
+func GenerateShuffleServers(rss *unifflev1alpha1.RemoteShuffleService) 
(*corev1.ServiceAccount, *appsv1.StatefulSet) {
        sa := GenerateSA(rss)
-       var services []*corev1.Service
-       if needGenerateHeadlessSVC(rss) {
-               services = append(services, GenerateHeadlessSVC(rss))
-       }
-       if needGenerateNodePortSVC(rss) {
-               services = append(services, GenerateNodePortSVC(rss))
-       }
        sts := GenerateSts(rss)
-       return sa, services, sts
+       return sa, sts
 }
 
 // GenerateSA generates service account of shuffle servers.
@@ -78,76 +69,6 @@ func GenerateSA(rss *unifflev1alpha1.RemoteShuffleService) 
*corev1.ServiceAccoun
        return sa
 }
 
-// GenerateHeadlessSVC generates headless service used by shuffle servers.
-func GenerateHeadlessSVC(rss *unifflev1alpha1.RemoteShuffleService) 
*corev1.Service {
-       name := generateHeadlessSVCName(rss)
-       svc := &corev1.Service{
-               ObjectMeta: metav1.ObjectMeta{
-                       Name:      name,
-                       Namespace: rss.Namespace,
-               },
-               Spec: corev1.ServiceSpec{
-                       ClusterIP: corev1.ClusterIPNone,
-                       Selector: map[string]string{
-                               "app": GenerateName(rss),
-                       },
-               },
-       }
-       if rss.Spec.ShuffleServer.RPCPort != nil {
-               svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
-                       Name:       "rpc",
-                       Protocol:   corev1.ProtocolTCP,
-                       Port:       
controllerconstants.ContainerShuffleServerRPCPort,
-                       TargetPort: 
intstr.FromInt(int(*rss.Spec.ShuffleServer.RPCPort)),
-               })
-       }
-       if rss.Spec.ShuffleServer.HTTPPort != nil {
-               svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
-                       Name:       "http",
-                       Protocol:   corev1.ProtocolTCP,
-                       Port:       
controllerconstants.ContainerShuffleServerHTTPPort,
-                       TargetPort: 
intstr.FromInt(int(*rss.Spec.ShuffleServer.HTTPPort)),
-               })
-       }
-       util.AddOwnerReference(&svc.ObjectMeta, rss)
-       return svc
-}
-
-// GenerateNodePortSVC generates nodePort service used by shuffle servers.
-func GenerateNodePortSVC(rss *unifflev1alpha1.RemoteShuffleService) 
*corev1.Service {
-       name := GenerateName(rss)
-       svc := &corev1.Service{
-               ObjectMeta: metav1.ObjectMeta{
-                       Name:      name,
-                       Namespace: rss.Namespace,
-               },
-               Spec: corev1.ServiceSpec{
-                       Type: corev1.ServiceTypeNodePort,
-                       Selector: map[string]string{
-                               "app": name,
-                       },
-               },
-       }
-       if needNodePortForRPC(rss) {
-               svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
-                       Protocol:   corev1.ProtocolTCP,
-                       Port:       
controllerconstants.ContainerShuffleServerRPCPort,
-                       TargetPort: 
intstr.FromInt(int(*rss.Spec.ShuffleServer.RPCPort)),
-                       NodePort:   *rss.Spec.ShuffleServer.RPCNodePort,
-               })
-       }
-       if needNodePortForHTTP(rss) {
-               svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
-                       Protocol:   corev1.ProtocolTCP,
-                       Port:       
controllerconstants.ContainerShuffleServerHTTPPort,
-                       TargetPort: 
intstr.FromInt(int(*rss.Spec.ShuffleServer.HTTPPort)),
-                       NodePort:   *rss.Spec.ShuffleServer.HTTPNodePort,
-               })
-       }
-       util.AddOwnerReference(&svc.ObjectMeta, rss)
-       return svc
-}
-
 // getReplicas returns replicas of shuffle servers.
 func getReplicas(rss *unifflev1alpha1.RemoteShuffleService) *int32 {
        // TODO: we will support hpa for rss object,
@@ -357,23 +278,3 @@ func generateMainContainerENV(rss 
*unifflev1alpha1.RemoteShuffleService) []corev
        }
        return env
 }
-
-// needGenerateNodePortSVC returns whether we need node port service for 
shuffle servers.
-func needGenerateNodePortSVC(rss *unifflev1alpha1.RemoteShuffleService) bool {
-       return needNodePortForRPC(rss) || needNodePortForHTTP(rss)
-}
-
-// needGenerateHeadlessSVC returns whether we need headless service for 
shuffle servers.
-func needGenerateHeadlessSVC(rss *unifflev1alpha1.RemoteShuffleService) bool {
-       return rss.Spec.ShuffleServer.RPCPort != nil || 
rss.Spec.ShuffleServer.HTTPPort != nil
-}
-
-// needNodePortForRPC returns whether we need node port service for rpc 
service of shuffle servers.
-func needNodePortForRPC(rss *unifflev1alpha1.RemoteShuffleService) bool {
-       return rss.Spec.ShuffleServer.RPCPort != nil && 
rss.Spec.ShuffleServer.RPCNodePort != nil
-}
-
-// needNodePortForRPC returns whether we need node port service for http 
service of shuffle servers.
-func needNodePortForHTTP(rss *unifflev1alpha1.RemoteShuffleService) bool {
-       return rss.Spec.ShuffleServer.HTTPPort != nil && 
rss.Spec.ShuffleServer.HTTPNodePort != nil
-}
diff --git a/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go 
b/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
index ce1033c4..c641fba5 100644
--- a/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
+++ b/deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
@@ -156,8 +156,10 @@ func generateRSSPatches(ar *admissionv1.AdmissionReview,
 
 // validateCoordinator validates configurations for coordinators.
 func validateCoordinator(coordinator *unifflev1alpha1.CoordinatorConfig) error 
{
-       if len(coordinator.RPCNodePort) != int(*coordinator.Count) ||
-               len(coordinator.HTTPNodePort) != int(*coordinator.Count) {
+       // number of RPCNodePort must equal with number of HTTPNodePort
+       if len(coordinator.RPCNodePort) != len(coordinator.HTTPNodePort) ||
+               // RPCNodePort/HTTPNodePort could be zero
+               (len(coordinator.HTTPNodePort) > 0 && 
len(coordinator.HTTPNodePort) != int(*coordinator.Count)) {
                return fmt.Errorf("invalid number of http or rpc node ports 
(%v/%v) <> (%v)",
                        len(coordinator.RPCNodePort), 
len(coordinator.HTTPNodePort), *coordinator.Count)
        }
diff --git a/deploy/kubernetes/operator/pkg/webhook/inspector/rss_test.go 
b/deploy/kubernetes/operator/pkg/webhook/inspector/rss_test.go
index 101a3e4c..e0f3dce6 100644
--- a/deploy/kubernetes/operator/pkg/webhook/inspector/rss_test.go
+++ b/deploy/kubernetes/operator/pkg/webhook/inspector/rss_test.go
@@ -21,6 +21,7 @@ import (
        "encoding/json"
        "testing"
 
+       "github.com/stretchr/testify/assert"
        admissionv1 "k8s.io/api/admission/v1"
        nodev1 "k8s.io/api/node/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -294,3 +295,63 @@ func buildTestRuntimeClass() *nodev1.RuntimeClass {
                Handler: "/etc/runtime/bin",
        }
 }
+
+func TestValidateCoordinator(t *testing.T) {
+       for _, tt := range []struct {
+               name        string
+               coordinator *uniffleapi.CoordinatorConfig
+               allowed     bool
+       }{
+               {
+                       name: "empty RPCNodePort",
+                       coordinator: &uniffleapi.CoordinatorConfig{
+                               Count: pointer.Int32(2),
+                       },
+                       allowed: true,
+               },
+               {
+                       name: "same number of RPCNodePort and HTTPNodePort",
+                       coordinator: &uniffleapi.CoordinatorConfig{
+                               Count:        pointer.Int32(2),
+                               RPCNodePort:  []int32{19996, 19997},
+                               HTTPNodePort: []int32{19996, 19997},
+                       },
+                       allowed: true,
+               },
+               {
+                       name: "different number of RPCNodePort and 
HTTPNodePort",
+                       coordinator: &uniffleapi.CoordinatorConfig{
+                               Count:        pointer.Int32(2),
+                               RPCNodePort:  []int32{19996, 19997, 19998},
+                               HTTPNodePort: []int32{19991, 19992},
+                       },
+                       allowed: false,
+               },
+               {
+                       name: "same number of RPCNodePort and HTTPNodePort but 
with different coordinator count",
+                       coordinator: &uniffleapi.CoordinatorConfig{
+                               Count:        pointer.Int32(1),
+                               RPCNodePort:  []int32{19996, 19997},
+                               HTTPNodePort: []int32{19991, 19992},
+                       },
+                       allowed: false,
+               },
+       } {
+               t.Run(tt.name, func(t *testing.T) {
+                       assertion := assert.New(t)
+                       tt.coordinator.ExcludeNodesFilePath = "/exclude_nodes"
+                       tt.coordinator.CommonConfig = &uniffleapi.CommonConfig{
+                               RSSPodSpec: &uniffleapi.RSSPodSpec{
+                                       LogHostPath:    "",
+                                       HostPathMounts: map[string]string{},
+                               },
+                       }
+                       err := validateCoordinator(tt.coordinator)
+                       if tt.allowed {
+                               assertion.Nil(err, "expected allowed, but got 
error: %v", err)
+                       } else {
+                               assertion.Error(err, "expected denied, but got 
accepted")
+                       }
+               })
+       }
+}

Reply via email to