This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new e6b4260a [ISSUE-48][FEATURE][FOLLOW UP] Add controller component (#214)
e6b4260a is described below

commit e6b4260aa54a9ffda30ec02fa050be1c108e8a9f
Author: jasonawang <[email protected]>
AuthorDate: Tue Sep 20 14:29:59 2022 +0800

    [ISSUE-48][FEATURE][FOLLOW UP] Add controller component (#214)
    
    ### What changes were proposed in this pull request?
    For issue #48
    I add some codes about controller module, and I'll add more unit tests in 
next PR.
    
    ### Why are the changes needed?
    Support K8S
    
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, we will add the doc later
    
    
    ### How was this patch tested?
    Manual test
---
 deploy/kubernetes/operator/cmd/controller/main.go  |  56 ++
 deploy/kubernetes/operator/go.mod                  |  14 +-
 deploy/kubernetes/operator/go.sum                  |  56 +-
 .../operator/pkg/controller/config/config.go       |  51 ++
 .../operator/pkg/controller/constants/constants.go |  83 ++
 .../pkg/controller/controller/process_rss_test.go  |  99 +++
 .../operator/pkg/controller/controller/rss.go      | 912 +++++++++++++++++++++
 .../operator/pkg/controller/controller/rss_test.go | 290 +++++++
 .../controller/controller/shuffle_server_test.go   | 162 ++++
 .../operator/pkg/controller/controller/test.go     |  34 +
 .../pkg/controller/sync/coordinator/coordinator.go | 297 +++++++
 .../controller/sync/shuffleserver/shuffleserver.go | 346 ++++++++
 .../pkg/controller/util/kubernetes/configmap.go    |  86 ++
 .../pkg/controller/util/kubernetes/deployment.go   |  98 +++
 .../pkg/controller/util/kubernetes/service.go      | 111 +++
 .../controller/util/kubernetes/serviceaccount.go   |  86 ++
 .../pkg/controller/util/kubernetes/statefulset.go  | 112 +++
 .../pkg/controller/util/properties/properties.go   |  58 ++
 .../operator/pkg/controller/util/util.go           | 231 ++++++
 .../operator/pkg/webhook/manager_test.go           |   2 +-
 20 files changed, 3157 insertions(+), 27 deletions(-)

diff --git a/deploy/kubernetes/operator/cmd/controller/main.go 
b/deploy/kubernetes/operator/cmd/controller/main.go
new file mode 100644
index 00000000..edebc133
--- /dev/null
+++ b/deploy/kubernetes/operator/cmd/controller/main.go
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+       "flag"
+
+       "k8s.io/klog/v2"
+       ctrl "sigs.k8s.io/controller-runtime"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/controller"
+)
+
+func main() {
+       klog.InitFlags(nil)
+       cfg := &config.Config{}
+       cfg.AddFlags()
+       flag.Parse()
+
+       cfg.Complete()
+       klog.Infof("run config: %+v", cfg)
+
+       // create a manager for leader election.
+       mgr, err := ctrl.NewManager(cfg.RESTConfig, ctrl.Options{
+               LeaderElection:   true,
+               LeaderElectionID: cfg.LeaderElectionID(),
+       })
+       if err != nil {
+               klog.Fatal(err)
+       }
+       // create a rss controller.
+       rc := controller.NewRSSController(cfg)
+       if err = mgr.Add(rc); err != nil {
+               klog.Fatal(err)
+       }
+       // start the rss controller.
+       if err = mgr.Start(cfg.RunCtx); err != nil {
+               klog.Fatal(err)
+       }
+}
diff --git a/deploy/kubernetes/operator/go.mod 
b/deploy/kubernetes/operator/go.mod
index 6121ae4c..853703b8 100644
--- a/deploy/kubernetes/operator/go.mod
+++ b/deploy/kubernetes/operator/go.mod
@@ -3,17 +3,19 @@ module 
github.com/apache/incubator-uniffle/deploy/kubernetes/operator
 go 1.16
 
 require (
+       github.com/fsnotify/fsnotify v1.5.1 // indirect
+       github.com/onsi/ginkgo v1.16.5 // indirect
        github.com/onsi/ginkgo/v2 v2.1.4
-       github.com/onsi/gomega v1.19.0
+       github.com/onsi/gomega v1.20.0
        github.com/parnurzeal/gorequest v0.2.16
        golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
        gomodules.xyz/jsonpatch/v2 v2.2.0
-       k8s.io/api v0.22.1
-       k8s.io/apimachinery v0.22.1
-       k8s.io/client-go v0.22.1
-       k8s.io/code-generator v0.22.1
+       k8s.io/api v0.22.2
+       k8s.io/apimachinery v0.22.2
+       k8s.io/client-go v0.22.2
+       k8s.io/code-generator v0.22.2
        k8s.io/klog/v2 v2.9.0
-       k8s.io/utils v0.0.0-20210802155522-efc7438f0176
+       k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
        moul.io/http2curl v1.0.0 // indirect
        sigs.k8s.io/controller-runtime v0.10.0
 )
diff --git a/deploy/kubernetes/operator/go.sum 
b/deploy/kubernetes/operator/go.sum
index a3e5c1ac..56a39259 100644
--- a/deploy/kubernetes/operator/go.sum
+++ b/deploy/kubernetes/operator/go.sum
@@ -51,6 +51,7 @@ github.com/armon/go-metrics 
v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
 github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod 
h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
 github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod 
h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
 github.com/benbjohnson/clock v1.0.3/go.mod 
h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
+github.com/benbjohnson/clock v1.1.0 
h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
 github.com/benbjohnson/clock v1.1.0/go.mod 
h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod 
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod 
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -92,6 +93,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod 
h1:E3ru+11k8xSBh+hMPgOLZm
 github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod 
h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod 
h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
 github.com/dustin/go-humanize v1.0.0/go.mod 
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 
h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
 github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod 
h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
 github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod 
h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
 github.com/emicklei/go-restful v2.9.5+incompatible 
h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk=
@@ -110,8 +112,9 @@ github.com/felixge/httpsnoop v1.0.1/go.mod 
h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
 github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod 
h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
 github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod 
h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
 github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
-github.com/fsnotify/fsnotify v1.4.9 
h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
 github.com/fsnotify/fsnotify v1.4.9/go.mod 
h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/fsnotify/fsnotify v1.5.1 
h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
+github.com/fsnotify/fsnotify v1.5.1/go.mod 
h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
 github.com/getsentry/raven-go v0.2.0/go.mod 
h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
 github.com/ghodss/yaml v1.0.0/go.mod 
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod 
h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
@@ -184,8 +187,9 @@ github.com/google/go-cmp v0.3.1/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
 github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.4/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
 github.com/google/go-cmp v0.5.5/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
+github.com/google/go-cmp v0.5.8/go.mod 
h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
 github.com/google/gofuzz v1.1.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -205,6 +209,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod 
h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
 github.com/googleapis/gnostic v0.5.1/go.mod 
h1:6U4PtQXGIEt/Z3h5MAT7FNofLnw9vXk2cUuW7uA/OeU=
 github.com/googleapis/gnostic v0.5.5 
h1:9fHAtK0uDfpveeqqo1hkEZJcFvYXAiCN3UutL8F9xHw=
 github.com/googleapis/gnostic v0.5.5/go.mod 
h1:7+EbHbldMins07ALC74bsA81Ovc97DwqyJO1AENw9kA=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 
h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
 github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod 
h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
 github.com/gorilla/websocket v1.4.2/go.mod 
h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod 
h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
@@ -252,6 +257,7 @@ github.com/json-iterator/go v1.1.11 
h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMW
 github.com/json-iterator/go v1.1.11/go.mod 
h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
 github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod 
h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
 github.com/jstemmer/go-junit-report v0.9.1/go.mod 
h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
+github.com/jtolds/gls v4.20.0+incompatible 
h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
 github.com/jtolds/gls v4.20.0+incompatible/go.mod 
h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
 github.com/julienschmidt/httprouter v1.2.0/go.mod 
h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
 github.com/julienschmidt/httprouter v1.3.0/go.mod 
h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
@@ -310,19 +316,20 @@ github.com/onsi/ginkgo 
v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB
 github.com/onsi/ginkgo v1.6.0/go.mod 
h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/ginkgo v1.12.1/go.mod 
h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
 github.com/onsi/ginkgo v1.14.0/go.mod 
h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
-github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
 github.com/onsi/ginkgo v1.16.4/go.mod 
h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
+github.com/onsi/ginkgo v1.16.5/go.mod 
h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
 github.com/onsi/ginkgo/v2 v2.1.3/go.mod 
h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
 github.com/onsi/ginkgo/v2 v2.1.4 
h1:GNapqRSid3zijZ9H77KrgVG4/8KqiyRsxcSxe+7ApXY=
 github.com/onsi/ginkgo/v2 v2.1.4/go.mod 
h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU=
 github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod 
h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
 github.com/onsi/gomega v1.7.1/go.mod 
h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
 github.com/onsi/gomega v1.10.1/go.mod 
h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
-github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU=
 github.com/onsi/gomega v1.15.0/go.mod 
h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
 github.com/onsi/gomega v1.17.0/go.mod 
h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
-github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
 github.com/onsi/gomega v1.19.0/go.mod 
h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
+github.com/onsi/gomega v1.20.0 h1:8W0cWlwFkflGPLltQvLRB7ZVD5HuP6ng320w2IS245Q=
+github.com/onsi/gomega v1.20.0/go.mod 
h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo=
 github.com/opentracing/opentracing-go v1.1.0/go.mod 
h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
 github.com/parnurzeal/gorequest v0.2.16 
h1:T/5x+/4BT+nj+3eSknXmCTnEVGSzFzPGdpqmUVVZXHQ=
 github.com/parnurzeal/gorequest v0.2.16/go.mod 
h1:3Kh2QUMJoqw3icWAecsyzkpY7UzRfDhbRdTjtNwNiUE=
@@ -373,7 +380,9 @@ github.com/sirupsen/logrus v1.4.2/go.mod 
h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd
 github.com/sirupsen/logrus v1.6.0/go.mod 
h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
 github.com/sirupsen/logrus v1.7.0/go.mod 
h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
 github.com/sirupsen/logrus v1.8.1/go.mod 
h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d 
h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod 
h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v1.6.4 
h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
 github.com/smartystreets/goconvey v1.6.4/go.mod 
h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/soheilhy/cmux v0.1.4/go.mod 
h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
 github.com/soheilhy/cmux v0.1.5/go.mod 
h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
@@ -434,6 +443,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod 
h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
 go.uber.org/atomic v1.4.0/go.mod 
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod 
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
 go.uber.org/goleak v1.1.10/go.mod 
h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
 go.uber.org/multierr v1.1.0/go.mod 
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
 go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
@@ -474,6 +484,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod 
h1:6SW0HCj/g11FgYtHl
 golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod 
h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
 golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod 
h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
 golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod 
h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
+golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 
h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
 golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod 
h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
 golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod 
h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
 golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod 
h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
@@ -483,7 +494,6 @@ golang.org/x/mod 
v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
 golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod 
h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
 golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
 golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 
h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o=
 golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod 
h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
@@ -520,11 +530,11 @@ golang.org/x/net 
v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwY
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod 
h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod 
h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod 
h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
-golang.org/x/net v0.0.0-20210520170846-37e1c6afe023 
h1:ADo5wSpq2gqaCGQWzk7S5vd//0iyyLeAratkEoG5dLE=
 golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20220225172249-27dd8689420f 
h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
 golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod 
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
+golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 
h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA=
+golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod 
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod 
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -588,15 +598,15 @@ golang.org/x/sys 
v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2 
h1:c8PlLMqBbOHoqtjteWm5/kbe6rNY2pbRfbIMVnepueo=
+golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 
h1:OH54vjqzRWmbJ62fjuhxy7AxFFgoHN0/DPc/UrL8cAs=
 golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 
h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc=
+golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod 
h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
-golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d 
h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
 golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 
h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod 
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -606,7 +616,6 @@ golang.org/x/text 
v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
@@ -656,7 +665,6 @@ golang.org/x/tools 
v0.0.0-20200505023115-26f46d2f7ef8/go.mod h1:EkVYQZoAsY45+roY
 golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod 
h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
 golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod 
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod 
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
-golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA=
 golang.org/x/tools v0.1.2/go.mod 
h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/tools v0.1.10 h1:QjFRCZxdOhBJ/UNgnBZLbNV13DlbnK0quyivTnXJM20=
 golang.org/x/tools v0.1.10/go.mod 
h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
@@ -731,8 +739,9 @@ google.golang.org/protobuf 
v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
 google.golang.org/protobuf v1.24.0/go.mod 
h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
 google.golang.org/protobuf v1.25.0/go.mod 
h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
 google.golang.org/protobuf v1.26.0-rc.1/go.mod 
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
-google.golang.org/protobuf v1.26.0 
h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
 google.golang.org/protobuf v1.26.0/go.mod 
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.28.0 
h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
+google.golang.org/protobuf v1.28.0/go.mod 
h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod 
h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -761,8 +770,9 @@ gopkg.in/yaml.v2 v2.4.0 
h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
-gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b 
h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
 gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
@@ -771,17 +781,21 @@ honnef.co/go/tools 
v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
 honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.1-2019.2.3/go.mod 
h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
 honnef.co/go/tools v0.0.1-2020.1.3/go.mod 
h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
-k8s.io/api v0.22.1 h1:ISu3tD/jRhYfSW8jI/Q1e+lRxkR7w9UwQEZ7FgslrwY=
 k8s.io/api v0.22.1/go.mod h1:bh13rkTp3F1XEaLGykbyRD2QaTTzPm0e/BMd8ptFONY=
+k8s.io/api v0.22.2 h1:M8ZzAD0V6725Fjg53fKeTJxGsJvRbk4TEm/fexHMtfw=
+k8s.io/api v0.22.2/go.mod h1:y3ydYpLJAaDI+BbSe2xmGcqxiWHmWjkEeIbiwHvnPR8=
 k8s.io/apiextensions-apiserver v0.22.1 
h1:YSJYzlFNFSfUle+yeEXX0lSQyLEoxoPJySRupepb0gE=
 k8s.io/apiextensions-apiserver v0.22.1/go.mod 
h1:HeGmorjtRmRLE+Q8dJu6AYRoZccvCMsghwS8XTUYb2c=
-k8s.io/apimachinery v0.22.1 h1:DTARnyzmdHMz7bFWFDDm22AM4pLWTQECMpRTFu2d2OM=
 k8s.io/apimachinery v0.22.1/go.mod 
h1:O3oNtNadZdeOMxHFVxOreoznohCpy0z6mocxbZr7oJ0=
+k8s.io/apimachinery v0.22.2 h1:ejz6y/zNma8clPVfNDLnPbleBo6MpoFy/HBiBqCouVk=
+k8s.io/apimachinery v0.22.2/go.mod 
h1:O3oNtNadZdeOMxHFVxOreoznohCpy0z6mocxbZr7oJ0=
 k8s.io/apiserver v0.22.1/go.mod h1:2mcM6dzSt+XndzVQJX21Gx0/Klo7Aen7i0Ai6tIa400=
-k8s.io/client-go v0.22.1 h1:jW0ZSHi8wW260FvcXHkIa0NLxFBQszTlhiAVsU5mopw=
 k8s.io/client-go v0.22.1/go.mod h1:BquC5A4UOo4qVDUtoc04/+Nxp1MeHcVc1HJm1KmG8kk=
-k8s.io/code-generator v0.22.1 h1:zAcKpn+xe9Iyc4qtZlfg4tD0f+SO2h5+e/s4pZPOVhs=
+k8s.io/client-go v0.22.2 h1:DaSQgs02aCC1QcwUdkKZWOeaVsQjYvWv8ZazcZ6JcHc=
+k8s.io/client-go v0.22.2/go.mod h1:sAlhrkVDf50ZHx6z4K0S40wISNTarf1r800F+RlCF6U=
 k8s.io/code-generator v0.22.1/go.mod 
h1:eV77Y09IopzeXOJzndrDyCI88UBok2h6WxAlBwpxa+o=
+k8s.io/code-generator v0.22.2 h1:+bUv9lpTnAWABtPkvO4x0kfz7j/kDEchVt0P/wXU3jQ=
+k8s.io/code-generator v0.22.2/go.mod 
h1:eV77Y09IopzeXOJzndrDyCI88UBok2h6WxAlBwpxa+o=
 k8s.io/component-base v0.22.1 h1:SFqIXsEN3v3Kkr1bS6rstrs1wd45StJqbtgbQ4nRQdo=
 k8s.io/component-base v0.22.1/go.mod 
h1:0D+Bl8rrnsPN9v0dyYvkqFfBeAd4u7n77ze+p8CMiPo=
 k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod 
h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
@@ -794,8 +808,10 @@ k8s.io/klog/v2 v2.9.0/go.mod 
h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec=
 k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e 
h1:KLHHjkdQFomZy8+06csTWZ0m1343QqxZhR2LJ1OxCYM=
 k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e/go.mod 
h1:vHXdDvt9+2spS2Rx9ql3I8tycm3H9FDfdUoIuKCefvw=
 k8s.io/utils v0.0.0-20210707171843-4b05e18ac7d9/go.mod 
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
-k8s.io/utils v0.0.0-20210802155522-efc7438f0176 
h1:Mx0aa+SUAcNRQbs5jUzV8lkDlGFU8laZsY9jrcVX5SY=
 k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod 
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
+k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod 
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
+k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 
h1:HNSDgDCrr/6Ly3WEGKZftiE7IY19Vz2GdbOCyI4qqhc=
+k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod 
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
 moul.io/http2curl v1.0.0 h1:6XwpyZOYsgZJrU8exnG87ncVkU1FVCcTRpwzOkTDUi8=
 moul.io/http2curl v1.0.0/go.mod h1:f6cULg+e4Md/oW1cYmwW4IWQOVl2lGbmCNGOHvzX2kE=
 rsc.io/binaryregexp v0.2.0/go.mod 
h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
diff --git a/deploy/kubernetes/operator/pkg/controller/config/config.go 
b/deploy/kubernetes/operator/pkg/controller/config/config.go
new file mode 100644
index 00000000..73f19f5a
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/config/config.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+       "flag"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+const (
+       flagWorkers = "workers"
+)
+
+// Config contains all configurations.
+type Config struct {
+       Workers int
+       utils.GenericConfig
+}
+
+// LeaderElectionID returns leader election ID.
+func (c *Config) LeaderElectionID() string {
+       return "rss-controller-" + constants.LeaderIDSuffix
+}
+
+// AddFlags adds all configurations to the global flags.
+func (c *Config) AddFlags() {
+       flag.IntVar(&c.Workers, flagWorkers, 1, "Concurrency of the rss 
controller.")
+       c.GenericConfig.AddFlags()
+}
+
+// Complete is called before rss-controller runs.
+func (c *Config) Complete() {
+       c.GenericConfig.Complete()
+}
diff --git a/deploy/kubernetes/operator/pkg/controller/constants/constants.go 
b/deploy/kubernetes/operator/pkg/controller/constants/constants.go
new file mode 100644
index 00000000..7c74d8bd
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/constants/constants.go
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package constants
+
+const (
+       // ContainerShuffleServerRPCPort indicates rpc port used in shuffle 
server containers.
+       ContainerShuffleServerRPCPort int32 = 19997
+       // ContainerShuffleServerHTTPPort indicates http port used in shuffle 
server containers.
+       ContainerShuffleServerHTTPPort int32 = 19996
+       // ContainerCoordinatorRPCPort indicates rpc port used in coordinator 
containers.
+       ContainerCoordinatorRPCPort int32 = 19997
+       // ContainerCoordinatorHTTPPort indicates http port used in coordinator 
containers.
+       ContainerCoordinatorHTTPPort int32 = 19996
+
+       // ShuffleServerRPCPortEnv indicates environment name of rpc port used 
by shuffle servers.
+       ShuffleServerRPCPortEnv = "SERVER_RPC_PORT"
+       // ShuffleServerHTTPPortEnv indicates environment name of http port 
used by shuffle servers.
+       ShuffleServerHTTPPortEnv = "SERVER_HTTP_PORT"
+       // CoordinatorRPCPortEnv indicates environment name of rpc port used by 
coordinators.
+       CoordinatorRPCPortEnv = "COORDINATOR_RPC_PORT"
+       // CoordinatorHTTPPortEnv indicates environment name of http port used 
by coordinators.
+       CoordinatorHTTPPortEnv = "COORDINATOR_HTTP_PORT"
+       // RSSCoordinatorQuorumEnv indicates environment name of rss 
coordinator quorum used by shuffle servers.
+       RSSCoordinatorQuorumEnv = "RSS_COORDINATOR_QUORUM"
+       // XmxSizeEnv indicates environment name of xmx size used by 
coordinators or shuffle servers.
+       XmxSizeEnv = "XMX_SIZE"
+       // ServiceNameEnv indicates environment name of service name used by 
coordinators or shuffle servers.
+       ServiceNameEnv = "SERVICE_NAME"
+       // NodeNameEnv indicates environment name of physical node name used by 
coordinators or shuffle servers.
+       NodeNameEnv = "NODE_NAME"
+       // RssIPEnv indicates environment name of shuffle servers' ip addresses.
+       RssIPEnv = "RSS_IP"
+
+       // CoordinatorServiceName defines environment variable value of 
"SERVICE_NAME" used by coordinators.
+       CoordinatorServiceName = "coordinator"
+       // ShuffleServerServiceName defines environment variable value of 
"SERVICE_NAME" used by shuffle servers.
+       ShuffleServerServiceName = "server"
+
+       // ExcludeNodesFile indicates volume mounting name of exclude nodes file
+       ExcludeNodesFile = "exclude-nodes-file"
+
+       // UpdateStatusError means reason of updating status of rss error
+       UpdateStatusError = "UpdateStatusError"
+
+       // OwnerLabel is the label of configMap's owner.
+       OwnerLabel = "uniffle.apache.org/owner-label"
+
+       // ConfigurationVolumeName is the name of configMap volume records 
configuration of coordinators or shuffle servers.
+       ConfigurationVolumeName = "configuration"
+)
+
+// PropertyKey defines property key in configuration of coordinators or 
shuffle servers.
+type PropertyKey string
+
+const (
+       // RPCServerPort represent rss port property in configuration of 
coordinators or shuffle servers.
+       RPCServerPort PropertyKey = "rss.rpc.server.port"
+       // JettyHTTPPort represent http port property in configuration of 
coordinators or shuffle servers.
+       JettyHTTPPort PropertyKey = "rss.jetty.http.port"
+
+       // CoordinatorQuorum represent coordinator quorum property in 
configuration of shuffle servers.
+       CoordinatorQuorum PropertyKey = "rss.coordinator.quorum"
+       // StorageBasePath represent storage base path property in 
configuration of shuffle servers.
+       StorageBasePath PropertyKey = "rss.storage.basePath"
+
+       // CoordinatorExcludeNodesPath represent exclude nodes path property in 
configuration of coordinators.
+       CoordinatorExcludeNodesPath PropertyKey = 
"rss.coordinator.exclude.nodes.file.path"
+)
diff --git 
a/deploy/kubernetes/operator/pkg/controller/controller/process_rss_test.go 
b/deploy/kubernetes/operator/pkg/controller/controller/process_rss_test.go
new file mode 100644
index 00000000..8eba2abe
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/controller/process_rss_test.go
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+       "context"
+       "reflect"
+       "testing"
+
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       kubefake "k8s.io/client-go/kubernetes/fake"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned/fake"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+func buildEmptyPhaseRssObj() *unifflev1alpha1.RemoteShuffleService {
+       return &unifflev1alpha1.RemoteShuffleService{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:            testRssName,
+                       Namespace:       testNamespace,
+                       ResourceVersion: "test",
+               },
+               Spec: unifflev1alpha1.RemoteShuffleServiceSpec{
+                       Coordinator: &unifflev1alpha1.CoordinatorConfig{
+                               ExcludeNodesFilePath: "/exclude_nodes",
+                       },
+               },
+               Status: unifflev1alpha1.RemoteShuffleServiceStatus{},
+       }
+}
+
+func TestProcessEmptyPhaseRss(t *testing.T) {
+       rss := buildEmptyPhaseRssObj()
+
+       rssClient := fake.NewSimpleClientset(rss)
+       kubeClient := kubefake.NewSimpleClientset()
+
+       rc := newRSSController(&config.Config{
+               GenericConfig: utils.GenericConfig{
+                       KubeClient: kubeClient,
+                       RSSClient:  rssClient,
+               },
+       })
+
+       for _, tt := range []struct {
+               name              string
+               expectedRssStatus unifflev1alpha1.RemoteShuffleServiceStatus
+               expectedNeedRetry bool
+               expectedError     error
+       }{
+               {
+                       name: "process rss object which has just been created, 
and whose status phase is empty",
+                       expectedRssStatus: 
unifflev1alpha1.RemoteShuffleServiceStatus{
+                               Phase: unifflev1alpha1.RSSPending,
+                       },
+                       expectedNeedRetry: false,
+               },
+       } {
+               needRetry, err := rc.processNormal(rss)
+               if err != nil {
+                       t.Errorf("process rss object failed: %v", err)
+                       return
+               }
+               if needRetry != tt.expectedNeedRetry {
+                       t.Errorf("unexpected result indicates whether to 
retrys: %v, expected: %v",
+                               needRetry, tt.expectedNeedRetry)
+                       return
+               }
+               updatedRss, getErr := 
rssClient.UniffleV1alpha1().RemoteShuffleServices(rss.Namespace).
+                       Get(context.TODO(), rss.Name, metav1.GetOptions{})
+               if getErr != nil {
+                       t.Errorf("get updated rss object failed: %v", err)
+                       return
+               }
+               if !reflect.DeepEqual(updatedRss.Status, tt.expectedRssStatus) {
+                       t.Errorf("unexpected status of updated rss object: %+v, 
expected: %+v",
+                               updatedRss.Status, tt.expectedRssStatus)
+                       return
+               }
+       }
+}
diff --git a/deploy/kubernetes/operator/pkg/controller/controller/rss.go 
b/deploy/kubernetes/operator/pkg/controller/controller/rss.go
new file mode 100644
index 00000000..2e21e04b
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/controller/rss.go
@@ -0,0 +1,912 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+       "context"
+       "fmt"
+       "strings"
+       "sync"
+       "time"
+
+       appsv1 "k8s.io/api/apps/v1"
+       corev1 "k8s.io/api/core/v1"
+       apierrors "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/labels"
+       "k8s.io/apimachinery/pkg/util/sets"
+       "k8s.io/apimachinery/pkg/util/wait"
+       "k8s.io/client-go/informers"
+       "k8s.io/client-go/kubernetes"
+       appslister "k8s.io/client-go/listers/apps/v1"
+       corelisters "k8s.io/client-go/listers/core/v1"
+       "k8s.io/client-go/tools/cache"
+       "k8s.io/client-go/tools/record"
+       "k8s.io/client-go/util/retry"
+       "k8s.io/client-go/util/workqueue"
+       "k8s.io/klog/v2"
+       "k8s.io/utils/pointer"
+       "sigs.k8s.io/controller-runtime/pkg/manager"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
+       controllerconstants 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/sync/coordinator"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/util"
+       kubeutil 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/util/kubernetes"
+       propertiestutil 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/util/properties"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/informers/externalversions"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/listers/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+var _ RSSController = &rssController{}
+
+const (
+       controllerName = "rss-controller"
+       appNameIndexer = "AppNameIndexer"
+)
+
+// RSSController is responsible for synchronizing rss objects.
+type RSSController interface {
+       manager.Runnable
+}
+
+// NewRSSController creates a RSSController.
+func NewRSSController(cfg *config.Config) RSSController {
+       return newRSSController(cfg)
+}
+
+// newRSSController creates a rssController.
+func newRSSController(cfg *config.Config) *rssController {
+       rc := &rssController{
+               workers:                      cfg.Workers,
+               kubeClient:                   cfg.KubeClient,
+               rssClient:                    cfg.RSSClient,
+               rssInformerFactory:           
externalversions.NewSharedInformerFactory(cfg.RSSClient, 0),
+               shuffleServerInformerFactory: 
utils.BuildShuffleServerInformerFactory(cfg.KubeClient),
+               coordinatorInformerFactory:   
utils.BuildCoordinatorInformerFactory(cfg.KubeClient),
+               rssQueue: 
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
+                       "rss-queue"),
+               shuffleServerQueue: 
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
+                       "pod-queue"),
+               eventRecorder: util.CreateRecorder(cfg.KubeClient, 
"rss-controller"),
+       }
+
+       rssInformer := 
rc.rssInformerFactory.Uniffle().V1alpha1().RemoteShuffleServices().Informer()
+       rssInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+               AddFunc:    rc.addRSS,
+               UpdateFunc: rc.updateRSS,
+       })
+       rc.rssInformer = rssInformer
+       rc.rssLister = 
rc.rssInformerFactory.Uniffle().V1alpha1().RemoteShuffleServices().Lister()
+
+       rc.stsInformer = 
rc.shuffleServerInformerFactory.Apps().V1().StatefulSets().Informer()
+       rc.stsLister = 
rc.shuffleServerInformerFactory.Apps().V1().StatefulSets().Lister()
+
+       rc.podInformer = 
rc.shuffleServerInformerFactory.Core().V1().Pods().Informer()
+       rc.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+               DeleteFunc: rc.deleteShuffleServer,
+       })
+       if err := rc.podInformer.AddIndexers(cache.Indexers{appNameIndexer: 
getAppName}); err != nil {
+               klog.Fatalf("add app name indexer failed: %v", err)
+       }
+       rc.podIndexer = rc.podInformer.GetIndexer()
+       rc.podLister = 
rc.shuffleServerInformerFactory.Core().V1().Pods().Lister()
+
+       rc.cmInformer = 
rc.coordinatorInformerFactory.Core().V1().ConfigMaps().Informer()
+       rc.cmLister = 
rc.coordinatorInformerFactory.Core().V1().ConfigMaps().Lister()
+
+       return rc
+}
+
+// rssController implements the RSSController interface.
+type rssController struct {
+       workers                      int
+       kubeClient                   kubernetes.Interface
+       rssClient                    versioned.Interface
+       rssInformerFactory           externalversions.SharedInformerFactory
+       rssInformer                  cache.SharedIndexInformer
+       rssLister                    v1alpha1.RemoteShuffleServiceLister
+       shuffleServerInformerFactory informers.SharedInformerFactory
+       stsInformer                  cache.SharedIndexInformer
+       stsLister                    appslister.StatefulSetLister
+       podInformer                  cache.SharedIndexInformer
+       podIndexer                   cache.Indexer
+       podLister                    corelisters.PodLister
+       coordinatorInformerFactory   informers.SharedInformerFactory
+       cmInformer                   cache.SharedIndexInformer
+       cmLister                     corelisters.ConfigMapLister
+       rssQueue                     workqueue.RateLimitingInterface
+       shuffleServerQueue           workqueue.RateLimitingInterface
+       eventRecorder                record.EventRecorder
+}
+
+// Start starts the RSSController.
+func (r *rssController) Start(ctx context.Context) error {
+       klog.V(2).Infof("%v is starting", controllerName)
+       r.rssInformerFactory.Start(ctx.Done())
+       r.shuffleServerInformerFactory.Start(ctx.Done())
+       r.coordinatorInformerFactory.Start(ctx.Done())
+       if !cache.WaitForCacheSync(ctx.Done(), r.rssInformer.HasSynced, 
r.stsInformer.HasSynced,
+               r.podInformer.HasSynced, r.cmInformer.HasSynced) {
+               return fmt.Errorf("wait for cache synced failed")
+       }
+       klog.V(2).Infof("%v started", controllerName)
+       for i := 0; i < r.workers; i++ {
+               go wait.Until(r.runRssWorker, time.Second, ctx.Done())
+       }
+       for i := 0; i < r.workers; i++ {
+               go wait.Until(r.runShuffleServerWorker, time.Second, ctx.Done())
+       }
+       <-ctx.Done()
+       return nil
+}
+
+// runRssWorker runs a thread that dequeues rss objects, handles them, and 
marks them done.
+func (r *rssController) runRssWorker() {
+       for r.processNextWorkRss() {
+       }
+}
+
+// processNextWorkRss deals with one rss key off the rssQueue, it returns 
false when it's time to quit.
+func (r *rssController) processNextWorkRss() bool {
+       rssKey, quit := r.rssQueue.Get()
+       if quit {
+               return false
+       }
+       defer r.rssQueue.Done(rssKey)
+
+       rssNamespace, rssName, err := 
cache.SplitMetaNamespaceKey(rssKey.(string))
+       if err != nil {
+               klog.Errorf("parsed rss key (%v) failed: %v", rssKey, err)
+               return true
+       }
+
+       var retryChecking bool
+       retryChecking, err = r.processRss(rssNamespace, rssName)
+       if err != nil {
+               klog.Errorf("processed rss %v failed : %v", rssKey, err)
+               r.rssQueue.AddRateLimited(rssKey)
+       } else if retryChecking {
+               klog.V(4).Infof("we will retry checking rss (%v) for upgrading 
or terminating", rssKey)
+               r.rssQueue.AddAfter(rssKey, time.Second*3)
+       } else {
+               r.rssQueue.Forget(rssKey)
+       }
+       return true
+}
+
+// runShuffleServerWorker runs a thread that dequeues shuffle server pods, 
handles them, and marks them done.
+func (r *rssController) runShuffleServerWorker() {
+       for r.processNextWorkShuffleServer() {
+       }
+}
+
+// processNextWorkShuffleServer deals with one shuffle server pod key off the 
shuffleServerQueue,
+// it returns false when it's time to quit.
+func (r *rssController) processNextWorkShuffleServer() bool {
+       key, quit := r.shuffleServerQueue.Get()
+       if quit {
+               return false
+       }
+       defer r.shuffleServerQueue.Done(key)
+
+       err := r.processShuffleServer(key.(string))
+       if err == nil {
+               r.shuffleServerQueue.Forget(key)
+               return true
+       }
+
+       klog.Errorf("processed shuffle server %v failed : %v", key, err)
+       r.shuffleServerQueue.AddRateLimited(key)
+       return true
+}
+
+// processShuffleServer process current shuffle server pod key.
+func (r *rssController) processShuffleServer(key string) error {
+       klog.V(4).Infof("processing shuffle server (%v)", key)
+       namespace, rssName, currentKey := parsePodCacheKey(key)
+       rss, err := r.rssLister.RemoteShuffleServices(namespace).Get(rssName)
+       if err != nil {
+               klog.Errorf("get rss by key (%v) failed: %v", key, err)
+               if !apierrors.IsNotFound(err) {
+                       return err
+               }
+               return nil
+       }
+
+       return r.updateTargetAndDeletedKeys(rss, currentKey)
+}
+
+// updateTargetAndDeletedKeys updates target and deleted keys in status of rss 
objects.
+func (r *rssController) updateTargetAndDeletedKeys(rss 
*unifflev1alpha1.RemoteShuffleService,
+       shuffleServerKey string) error {
+       deletedKeys := sets.NewString(rss.Status.DeletedKeys...)
+       if deletedKeys.Has(shuffleServerKey) {
+               klog.V(3).Infof("shuffle server (%v) has already been deleted 
for rss (%v)",
+                       shuffleServerKey, utils.UniqueName(rss))
+               return nil
+       }
+
+       deletedKeys.Insert(shuffleServerKey)
+       oldTargetKeys := sets.NewString(rss.Status.TargetKeys...)
+       newTargetKeys := oldTargetKeys.Difference(deletedKeys)
+
+       cm, err := r.getExcludeNodesCM(rss)
+       if err != nil {
+               klog.Errorf("get exclude nodes in configMap for rss (%v) 
failed: %v",
+                       utils.UniqueName(rss), err)
+               return err
+       }
+
+       // calculate new exclude nodes.
+       excludeNodesFileKey := utils.GetExcludeNodesConfigMapKey(rss)
+       oldExcludeNodes := sets.NewString(strings.Split(
+               cm.Data[excludeNodesFileKey], "\n")...)
+       newExcludeNodes := utils.ConvertShuffleServerKeysToNodes(newTargetKeys)
+       if !newExcludeNodes.Equal(oldExcludeNodes) {
+               cmCopy := cm.DeepCopy()
+               cmCopy.Data[excludeNodesFileKey] = strings.Join(
+                       utils.GetSortedList(newExcludeNodes), "\n")
+               if _, err = r.kubeClient.CoreV1().ConfigMaps(cmCopy.Namespace).
+                       Update(context.Background(), cmCopy, 
metav1.UpdateOptions{}); err != nil {
+                       klog.Errorf("update exclude nodes in configMap (%v) for 
rss (%v) failed: %v",
+                               cmCopy.Name, utils.UniqueName(rss), err)
+                       return err
+               }
+       }
+
+       if err = r.updateDeletedKeys(rss, shuffleServerKey); err != nil {
+               klog.Errorf("add (%v) to deleted keys of rss (%v) failed: %v",
+                       shuffleServerKey, utils.UniqueName(rss), err)
+               return err
+       }
+       return nil
+}
+
+func (r *rssController) clearExcludeNodes(rss 
*unifflev1alpha1.RemoteShuffleService) error {
+       cm, err := r.getExcludeNodesCM(rss)
+       if err != nil {
+               klog.Errorf("get exclude nodes in configMap for rss (%v) 
failed: %v",
+                       utils.UniqueName(rss), err)
+               return err
+       }
+       cmCopy := cm.DeepCopy()
+       cmCopy.Data[utils.GetExcludeNodesConfigMapKey(rss)] = ""
+       if _, err = r.kubeClient.CoreV1().ConfigMaps(cmCopy.Namespace).
+               Update(context.Background(), cmCopy, metav1.UpdateOptions{}); 
err != nil {
+               klog.Errorf("update exclude nodes in configMap (%v) for rss 
(%v) failed: %v",
+                       cmCopy.Name, utils.UniqueName(rss), err)
+               return err
+       }
+       return nil
+}
+
+// getExcludeNodesCM returns configMap records exclude nodes.
+func (r *rssController) getExcludeNodesCM(rss 
*unifflev1alpha1.RemoteShuffleService) (
+       *corev1.ConfigMap, error) {
+       namespace := rss.Namespace
+       name := utils.GenerateCoordinatorName(rss)
+       cm, err := r.cmLister.ConfigMaps(namespace).Get(name)
+       if err != nil {
+               klog.Errorf("get configMap (%v) for rss failed: %v", name, 
utils.UniqueName(rss), err)
+               return nil, err
+       }
+       return cm, nil
+}
+
+// updateDeletedKeys updates deleted keys in status of rss.
+func (r *rssController) updateDeletedKeys(rss 
*unifflev1alpha1.RemoteShuffleService,
+       key string) error {
+       if rss.Status.Phase != unifflev1alpha1.RSSUpgrading && rss.Status.Phase 
!= unifflev1alpha1.RSSTerminating {
+               return nil
+       }
+       rssStatus := rss.Status.DeepCopy()
+       rssStatus.DeletedKeys = 
utils.GetSortedList(sets.NewString(rssStatus.DeletedKeys...).Insert(key))
+       return r.updateRssStatus(rss, rssStatus)
+}
+
+// processRss process current rss by its namespace and name, and returns if we 
need to try again to check upgrading.
+func (r *rssController) processRss(namespace, name string) (bool, error) {
+       klog.V(4).Infof("processing rss (%v/%v)", namespace, name)
+       startTime := time.Now()
+       defer func() {
+               klog.V(4).Infof("finished processing rss (%v/%v) cost %v",
+                       namespace, name, time.Since(startTime))
+       }()
+
+       rss, err := r.rssLister.RemoteShuffleServices(namespace).Get(name)
+       if err != nil {
+               if apierrors.IsNotFound(err) {
+                       klog.V(5).Infof("ignored deleted rss (%v/%v)", 
namespace, name)
+                       return false, nil
+               }
+               klog.Errorf("get rss (%v/%v) failed: %v", namespace, name, err)
+               return false, err
+       }
+
+       if rss.DeletionTimestamp != nil {
+               return r.processDeleting(rss)
+       }
+       return r.processNormal(rss)
+}
+
+// processDeleting process the deleting rss.
+func (r *rssController) processDeleting(rss 
*unifflev1alpha1.RemoteShuffleService) (bool, error) {
+       klog.V(4).Infof("process rss (%v) to be deleted in %v phase",
+               utils.UniqueName(rss), rss.Status.Phase)
+       if rss.Status.Phase == unifflev1alpha1.RSSRunning || rss.Status.Phase 
== unifflev1alpha1.RSSPending {
+               return false, r.updateRssStatus(rss, 
&unifflev1alpha1.RemoteShuffleServiceStatus{
+                       Phase: unifflev1alpha1.RSSTerminating,
+               })
+       } else if rss.Status.Phase == unifflev1alpha1.RSSTerminating {
+               if ok, err := r.canDeleteRss(rss); err == nil && ok {
+                       return false, r.removeFinalizer(rss)
+               }
+               return r.scaleDownShuffleServer(rss)
+       }
+       return false, nil
+}
+
+func (r *rssController) canDeleteRss(rss 
*unifflev1alpha1.RemoteShuffleService) (bool, error) {
+       objs, err := r.podIndexer.ByIndex(appNameIndexer, 
utils.GenerateShuffleServerName(rss))
+       if err != nil {
+               klog.Errorf("get objects by indexer (%v) failed: %v", 
appNameIndexer, err)
+               return false, err
+       }
+       lastPods := 0
+       for _, obj := range objs {
+               pod, ok := obj.(*corev1.Pod)
+               if ok && pod.DeletionTimestamp == nil {
+                       lastPods++
+               }
+       }
+       return lastPods == 0, nil
+}
+
+func (r *rssController) scaleDownShuffleServer(rss 
*unifflev1alpha1.RemoteShuffleService) (
+       bool, error) {
+       namespace := rss.Namespace
+       stsName := utils.GenerateShuffleServerName(rss)
+       sts, err := r.stsLister.StatefulSets(namespace).Get(stsName)
+       if err != nil {
+               if apierrors.IsNotFound(err) {
+                       klog.V(5).Infof("ignored deleted statefulSet (%v/%v)", 
namespace, stsName)
+                       return false, nil
+               }
+               klog.Errorf("get statefulSet (%v) for rss (%v) failed: %v",
+                       stsName, utils.UniqueName(rss), err)
+               return false, err
+       }
+       if *sts.Spec.Replicas == 0 {
+               return true, nil
+       }
+       stsCopy := sts.DeepCopy()
+       stsCopy.Spec.Replicas = pointer.Int32(0)
+       _, err = kubeutil.PatchStatefulSet(r.kubeClient, sts, stsCopy)
+       return true, err
+}
+
+// processNormal process the normal rss, and determines whether we need to 
check again.
+func (r *rssController) processNormal(rss 
*unifflev1alpha1.RemoteShuffleService) (bool, error) {
+       klog.V(4).Infof("process rss (%v) in %v phase", utils.UniqueName(rss), 
rss.Status.Phase)
+       switch rss.Status.Phase {
+       case unifflev1alpha1.RSSPending:
+               return false, r.processPendingRSS(rss)
+       case unifflev1alpha1.RSSRunning:
+               return false, r.processRunningRSS(rss)
+       case unifflev1alpha1.RSSUpgrading:
+               return r.processUpgradingRSS(rss)
+       case unifflev1alpha1.RSSFailed:
+               klog.Errorf("failed to process rss (%v): %v", 
utils.UniqueName(rss), rss.Status.Reason)
+       default:
+               return false, r.updateRssStatus(rss,
+                       &unifflev1alpha1.RemoteShuffleServiceStatus{Phase: 
unifflev1alpha1.RSSPending})
+       }
+       return false, nil
+}
+
+// processPendingRSS processes the pending rss by synchronizing generated 
objects of coordinators and shuffle servers.
+func (r *rssController) processPendingRSS(rss 
*unifflev1alpha1.RemoteShuffleService) error {
+       if err := r.syncObjects(rss); err != nil {
+               klog.Errorf("sync objects for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return err
+       }
+       return r.updateRssStatus(rss,
+               &unifflev1alpha1.RemoteShuffleServiceStatus{Phase: 
unifflev1alpha1.RSSRunning})
+}
+
+// processRunningRSS processes the running rss.
+func (r *rssController) processRunningRSS(rss 
*unifflev1alpha1.RemoteShuffleService) error {
+       if err := r.syncObjects(rss); err != nil {
+               klog.Errorf("sync objects for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return err
+       }
+       if *rss.Spec.ShuffleServer.Sync {
+               return r.prepareForUpgrading(rss)
+       }
+       return nil
+}
+
+// syncObjects synchronizes objects related to coordinators and shuffle 
servers.
+func (r *rssController) syncObjects(rss *unifflev1alpha1.RemoteShuffleService) 
error {
+       if err := r.syncConfigMap(rss); err != nil {
+               klog.Errorf("sync configMap for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return err
+       }
+       if err := r.syncCoordinator(rss); err != nil {
+               klog.Errorf("sync coordinators for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return err
+       }
+       if err := r.syncShuffleServer(rss); err != nil {
+               klog.Errorf("sync shuffle servers for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return err
+       }
+       return nil
+}
+
+func (r *rssController) syncConfigMap(rss 
*unifflev1alpha1.RemoteShuffleService) error {
+       cm, err := r.kubeClient.CoreV1().ConfigMaps(rss.Namespace).
+               Get(context.Background(), rss.Spec.ConfigMapName, 
metav1.GetOptions{})
+       if err != nil {
+               klog.Errorf("get configMap of rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return err
+       }
+       if owner := util.GetConfigMapOwner(cm); len(owner) > 0 && owner != 
rss.Name {
+               return fmt.Errorf("conflict configMap name of rss (%v <> %v)", 
owner, rss.Name)
+       }
+       if _, ok := cm.Data[constants.CoordinatorConfigKey]; !ok {
+               return fmt.Errorf("%v not exist", 
constants.CoordinatorConfigKey)
+       }
+       if _, ok := cm.Data[constants.ShuffleServerConfigKey]; !ok {
+               return fmt.Errorf("%v not exist", 
constants.ShuffleServerConfigKey)
+       }
+       if _, ok := cm.Data[constants.Log4jPropertiesKey]; !ok {
+               return fmt.Errorf("%v not exist", constants.Log4jPropertiesKey)
+       }
+
+       cm.Data[constants.CoordinatorConfigKey] = 
propertiestutil.UpdateProperties(
+               cm.Data[constants.CoordinatorConfigKey], 
coordinator.GenerateProperties(rss))
+       cm.Data[constants.ShuffleServerConfigKey] = 
propertiestutil.UpdateProperties(
+               cm.Data[constants.ShuffleServerConfigKey], 
shuffleserver.GenerateProperties(rss))
+       if cm.Labels == nil {
+               cm.Labels = make(map[string]string)
+       }
+       cm.Labels[controllerconstants.OwnerLabel] = rss.Name
+       if _, err = r.kubeClient.CoreV1().ConfigMaps(cm.Namespace).
+               Update(context.Background(), cm, metav1.UpdateOptions{}); err 
!= nil {
+               klog.Errorf("update configMap (%v) of rss (%v) failed: %v", 
rss.Spec.ConfigMapName,
+                       utils.UniqueName(rss), err)
+       }
+       return err
+}
+
+// processUpgradingRSS processes the upgrading rss, and determines whether we 
need to check again.
+func (r *rssController) processUpgradingRSS(rss 
*unifflev1alpha1.RemoteShuffleService) (bool, error) {
+       switch rss.Spec.ShuffleServer.UpgradeStrategy.Type {
+       case unifflev1alpha1.FullUpgrade:
+               return r.processFullUpgrade(rss)
+       case unifflev1alpha1.PartitionUpgrade:
+               return r.processPartitionUpgrade(rss)
+       case unifflev1alpha1.SpecificUpgrade:
+               return r.processSpecificUpgrade(rss)
+       case unifflev1alpha1.FullRestart:
+               return r.processFullRestart(rss)
+       }
+       return false, nil
+}
+
+// processFullUpgrade process the rss needs full upgrade, and determines 
whether we need to check again.
+func (r *rssController) processFullUpgrade(rss 
*unifflev1alpha1.RemoteShuffleService) (bool, error) {
+       return r.processRollingUpgrade(rss, 0)
+}
+
+// processPartitionUpgrade process the rss needs partition upgrade, and 
determines whether we need to check again.
+func (r *rssController) processPartitionUpgrade(
+       rss *unifflev1alpha1.RemoteShuffleService) (bool, error) {
+       return r.processRollingUpgrade(rss, 
*rss.Spec.ShuffleServer.UpgradeStrategy.Partition)
+}
+
+// // processSpecificUpgrade process the rss needs specific upgrade, and 
determines whether we need to check again.
+func (r *rssController) processSpecificUpgrade(rss 
*unifflev1alpha1.RemoteShuffleService) (
+       bool, error) {
+       if err := r.serZeroPartition(rss, 0); err != nil {
+               klog.Errorf("update partition of statefulSet of rss (%v) 
failed: %v",
+                       utils.UniqueName(rss), err)
+               return false, err
+       }
+
+       targetNames := getNamesNeedToBeUpgraded(rss)
+       return r.handleTargetPods(targetNames, rss)
+}
+
+// handleTargetPods handles all target pods need to be upgraded or restarted,
+// and determines whether we need to check again.
+func (r *rssController) handleTargetPods(targetNames []string, rss 
*unifflev1alpha1.RemoteShuffleService) (
+       bool, error) {
+       if len(targetNames) == 0 {
+               if err := r.clearExcludeNodes(rss); err != nil {
+                       klog.Errorf("clear exclude nodes for finished upgrade 
of rss (%v) failed: %v",
+                               utils.UniqueName(rss))
+                       return true, err
+               }
+               return false, r.updateRssStatus(rss,
+                       &unifflev1alpha1.RemoteShuffleServiceStatus{Phase: 
unifflev1alpha1.RSSRunning})
+       }
+       r.deleteTargetPods(targetNames, rss)
+       return true, nil
+}
+
+// deleteTargetPods tries to delete all target pods for upgrading or 
restarting.
+func (r *rssController) deleteTargetPods(targetNames []string, rss 
*unifflev1alpha1.RemoteShuffleService) {
+       wg := sync.WaitGroup{}
+       wg.Add(len(targetNames))
+       for i := range targetNames {
+               go func(podName string) {
+                       defer wg.Done()
+                       klog.V(5).Infof("try to delete shuffler server %v/%v", 
rss.Namespace, podName)
+                       if err := 
r.kubeClient.CoreV1().Pods(rss.Namespace).Delete(context.Background(),
+                               podName, metav1.DeleteOptions{}); err != nil {
+                               klog.V(5).Infof("deleted shuffler server %v/%v 
failed: %v",
+                                       rss.Namespace, podName, err)
+                       }
+               }(targetNames[i])
+       }
+       wg.Wait()
+}
+
+// // processSpecificUpgrade process the rss needs full restart, and 
determines whether it has finished restarting.
+func (r *rssController) processFullRestart(rss 
*unifflev1alpha1.RemoteShuffleService) (
+       bool, error) {
+       targetNames, err := getNamesNeedToBeRestarted(r.podLister, rss)
+       if err != nil {
+               klog.Errorf("get all pod names of rss (%v) to be restarted 
failed: %v", utils.UniqueName(rss), err)
+               return false, err
+       }
+       return r.handleTargetPods(targetNames, rss)
+}
+
+// syncCoordinator synchronizes objects related to coordinators.
+func (r *rssController) syncCoordinator(rss 
*unifflev1alpha1.RemoteShuffleService) error {
+       if !*rss.Spec.Coordinator.Sync {
+               return nil
+       }
+       serviceAccount, configMap, services, deployments := 
coordinator.GenerateCoordinators(rss)
+       if err := kubeutil.SyncServiceAccount(r.kubeClient, serviceAccount); 
err != nil {
+               klog.Errorf("sync serviceAccount (%v) for rss (%v) failed: %v",
+                       utils.UniqueName(serviceAccount), 
utils.UniqueName(rss), err)
+               return err
+       }
+       if err := kubeutil.SyncConfigMap(r.kubeClient, configMap); err != nil {
+               klog.Errorf("sync configMap (%v) for rss (%v) failed: %v",
+                       utils.UniqueName(serviceAccount), 
utils.UniqueName(rss), err)
+               return err
+       }
+       if err := kubeutil.SyncServices(r.kubeClient, services); err != nil {
+               klog.Errorf("sync services for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return err
+       }
+       if err := kubeutil.SyncDeployments(r.kubeClient, deployments); err != 
nil {
+               klog.Errorf("sync deployments for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return err
+       }
+       return nil
+}
+
+// syncShuffleServer synchronizes objects related to shuffle servers.
+func (r *rssController) syncShuffleServer(rss 
*unifflev1alpha1.RemoteShuffleService) error {
+       if rss.Status.Phase == unifflev1alpha1.RSSRunning && 
!*rss.Spec.ShuffleServer.Sync {
+               return nil
+       }
+       serviceAccount, services, statefulSet := 
shuffleserver.GenerateShuffleServers(rss)
+       if err := kubeutil.SyncServiceAccount(r.kubeClient, serviceAccount); 
err != nil {
+               klog.Errorf("sync SA (%v) for rss (%v) failed: %v",
+                       utils.UniqueName(serviceAccount), 
utils.UniqueName(rss), err)
+               return err
+       }
+       if err := kubeutil.SyncServices(r.kubeClient, services); err != nil {
+               klog.Errorf("sync SVCs for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return err
+       }
+       if _, _, err := kubeutil.SyncStatefulSet(r.kubeClient, statefulSet, 
true); err != nil {
+               klog.Errorf("sync StatefulSet for rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return err
+       }
+       return nil
+}
+
+// prepareForUpgrading prepares for the upgrade by disable syncing shuffle 
servers.
+func (r *rssController) prepareForUpgrading(rss 
*unifflev1alpha1.RemoteShuffleService) error {
+       updated, err := r.disableSyncingShuffleServer(rss)
+       if err != nil {
+               klog.Errorf("disabled syncing shuffle server failed: %v", err)
+               return err
+       }
+       err = r.updateRssStatus(updated,
+               &unifflev1alpha1.RemoteShuffleServiceStatus{Phase: 
unifflev1alpha1.RSSUpgrading})
+       if err != nil {
+               r.eventRecorder.Event(rss, corev1.EventTypeWarning, 
controllerconstants.UpdateStatusError,
+                       fmt.Sprintf("set %v phase failed: %v", 
unifflev1alpha1.RSSUpgrading, err))
+       }
+       return err
+}
+
+// disableSyncingShuffleServer disables syncing of shuffle servers before 
upgrading.
+func (r *rssController) disableSyncingShuffleServer(rss 
*unifflev1alpha1.RemoteShuffleService) (
+       *unifflev1alpha1.RemoteShuffleService, error) {
+       rssCopy := rss.DeepCopy()
+       rssCopy.Spec.ShuffleServer.Sync = pointer.Bool(false)
+       updated, err := 
r.rssClient.UniffleV1alpha1().RemoteShuffleServices(rssCopy.Namespace).
+               Update(context.Background(), rssCopy, metav1.UpdateOptions{})
+       if err != nil {
+               klog.Errorf("updated .spec.shuffleServer.sync of rss (%v) 
failed: %v",
+                       utils.UniqueName(rssCopy), err)
+               return nil, err
+       }
+       return updated, nil
+}
+
+// updateRssStatus updates status of rss.
+func (r *rssController) updateRssStatus(rss 
*unifflev1alpha1.RemoteShuffleService,
+       status *unifflev1alpha1.RemoteShuffleServiceStatus) error {
+       rssCopy := rss.DeepCopy()
+       rssCopy.Status = *status
+       _, err := 
r.rssClient.UniffleV1alpha1().RemoteShuffleServices(rssCopy.Namespace).
+               UpdateStatus(context.Background(), rssCopy, 
metav1.UpdateOptions{})
+       if err == nil {
+               return nil
+       }
+       klog.Errorf("updated status of rss (%v) failed: %v", 
utils.UniqueName(rssCopy), err)
+       if !apierrors.IsConflict(err) {
+               return err
+       }
+       return retry.RetryOnConflict(retry.DefaultRetry, func() error {
+               latestRss, getErr := 
r.rssClient.UniffleV1alpha1().RemoteShuffleServices(rss.Namespace).
+                       Get(context.Background(), rss.Name, metav1.GetOptions{})
+               if getErr != nil {
+                       klog.Errorf("get rss (%v) failed: %v", 
utils.UniqueName(rss), getErr)
+                       return getErr
+               }
+               latestRss.Status = *status
+               _, updateErr := 
r.rssClient.UniffleV1alpha1().RemoteShuffleServices(rss.Namespace).
+                       UpdateStatus(context.Background(), latestRss, 
metav1.UpdateOptions{})
+               if updateErr != nil {
+                       klog.Errorf("retry updating status of rss (%v) failed: 
%v",
+                               utils.UniqueName(latestRss), updateErr)
+               }
+               return updateErr
+       })
+}
+
+// addRSS handles the added rss.
+func (r *rssController) addRSS(obj interface{}) {
+       r.enqueueRss(obj)
+}
+
+// updateRSS handles the updated rss.
+func (r *rssController) updateRSS(_, newObj interface{}) {
+       r.enqueueRss(newObj)
+}
+
+// deleteShuffleServer handles the deleted shuffler server pod.
+func (r *rssController) deleteShuffleServer(obj interface{}) {
+       r.enqueueShuffleServer(obj)
+}
+
+// enqueueRss enqueues a rss.
+func (r *rssController) enqueueRss(obj interface{}) {
+       rss, ok := obj.(*unifflev1alpha1.RemoteShuffleService)
+       if !ok {
+               klog.Errorf("object is not a rss: %+v", obj)
+               return
+       }
+
+       key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(rss)
+       if err != nil {
+               klog.Errorf("can not get key of rss: %v", utils.UniqueName(rss))
+               return
+       }
+       r.rssQueue.Add(key)
+}
+
+// enqueueShuffleServer enqueues a shuffle server pod.
+func (r *rssController) enqueueShuffleServer(obj interface{}) {
+       var pod *corev1.Pod
+       switch t := obj.(type) {
+       case *corev1.Pod:
+               pod = t
+       case cache.DeletedFinalStateUnknown:
+               if p, ok := t.Obj.(*corev1.Pod); ok {
+                       pod = p
+               }
+       }
+       if pod == nil {
+               klog.Errorf("object is not a Pod object: %+v", obj)
+               return
+       }
+       rssName := utils.GetRssNameByPod(pod)
+       if len(rssName) != 0 {
+               key := buildPodCacheKey(pod.Namespace, rssName, 
utils.BuildShuffleServerKey(pod))
+               r.shuffleServerQueue.Add(key)
+       }
+}
+
+// removeFinalizer removes the finalizer of rss for deleting it.
+func (r *rssController) removeFinalizer(rss 
*unifflev1alpha1.RemoteShuffleService) error {
+       rssCopy := rss.DeepCopy()
+       var finalizers []string
+       for _, f := range finalizers {
+               if f == constants.RSSFinalizerName {
+                       continue
+               }
+               finalizers = append(finalizers, f)
+       }
+       rssCopy.Finalizers = finalizers
+       _, err := 
r.rssClient.UniffleV1alpha1().RemoteShuffleServices(rssCopy.Namespace).
+               Update(context.Background(), rssCopy, metav1.UpdateOptions{})
+       if err != nil {
+               klog.Errorf("removed finalizer from rss (%v) failed: %v", 
utils.UniqueName(rssCopy), err)
+               return err
+       }
+       klog.V(3).Infof("removed finalizer from rss (%v)", 
utils.UniqueName(rssCopy))
+       return nil
+}
+
+// processRollingUpgrade processes full or partition upgrade in rolling mode.
+func (r *rssController) processRollingUpgrade(rss 
*unifflev1alpha1.RemoteShuffleService,
+       minPartition int32) (bool, error) {
+       finished, oldSts, err := r.checkUpgradeFinished(rss, minPartition)
+       if err != nil {
+               klog.Errorf("checked whether the current upgrade of rss (%v) 
has been finished failed: %v",
+                       utils.UniqueName(rss), err)
+               return false, err
+       }
+       if finished {
+               if err = r.clearExcludeNodes(rss); err != nil {
+                       klog.Errorf("clear exclude nodes for finished upgrade 
of rss (%v) failed: %v",
+                               utils.UniqueName(rss))
+                       return true, err
+               }
+               return false, r.updateRssStatus(rss,
+                       &unifflev1alpha1.RemoteShuffleServiceStatus{Phase: 
unifflev1alpha1.RSSRunning})
+       }
+
+       return true, r.decreasePartition(oldSts, minPartition)
+}
+
+// checkUpgradeFinished checks whether we have finished the current upgrade.
+func (r *rssController) checkUpgradeFinished(rss 
*unifflev1alpha1.RemoteShuffleService,
+       minPartition int32) (bool, *appsv1.StatefulSet, error) {
+       stsName := shuffleserver.GenerateName(rss)
+       oldSts, err := r.stsLister.StatefulSets(rss.Namespace).Get(stsName)
+       if err != nil {
+               klog.Errorf("get StatefulSet (%v/%v) failed: %v", 
rss.Namespace, stsName, err)
+               return false, nil, err
+       }
+       if oldSts.Status.CurrentRevision == oldSts.Status.UpdateRevision {
+               minPartition = 0
+       }
+       klog.V(4).Infof("current updatedReplicas: %v, replicas: %v, 
minPartition: %v",
+               oldSts.Status.UpdatedReplicas, *oldSts.Spec.Replicas, 
minPartition)
+       if oldSts.Status.UpdatedReplicas >= *oldSts.Spec.Replicas-minPartition 
&&
+               *oldSts.Spec.Replicas == oldSts.Status.ReadyReplicas {
+               klog.V(4).Infof("do not need to update partition of statefulSet 
(%v)",
+                       utils.UniqueName(oldSts))
+               return true, nil, nil
+       }
+       return false, oldSts, nil
+}
+
+// decreasePartition decrease the partition value of statefulSet used by 
shuffle server if we do net finish the upgrade.
+func (r *rssController) decreasePartition(oldSts *appsv1.StatefulSet, 
minPartition int32) error {
+       newSts := oldSts.DeepCopy()
+       targetPartition := *newSts.Spec.Replicas - 
oldSts.Status.UpdatedReplicas - 1
+       if targetPartition < minPartition {
+               targetPartition = minPartition
+       }
+       klog.V(4).Infof("set partition of statefulSet (%v) to %v >= %v",
+               utils.UniqueName(newSts), targetPartition, minPartition)
+       newSts.Spec.UpdateStrategy.RollingUpdate.Partition = &targetPartition
+       _, err := kubeutil.PatchStatefulSet(r.kubeClient, oldSts, newSts)
+       return err
+}
+
+// serZeroPartition sets zero value for partition of statefulSet used by 
shuffle servers for specify upgrade,
+// because we need to
+func (r *rssController) serZeroPartition(rss 
*unifflev1alpha1.RemoteShuffleService,
+       partition int32) error {
+       stsName := shuffleserver.GenerateName(rss)
+       oldSts, err := r.stsLister.StatefulSets(rss.Namespace).Get(stsName)
+       if err != nil {
+               klog.Errorf("get StatefulSet (%v/%v) failed: %v", 
rss.Namespace, stsName, err)
+               return err
+       }
+       newSts := oldSts.DeepCopy()
+       newSts.Spec.UpdateStrategy.RollingUpdate.Partition = &partition
+       _, err = kubeutil.PatchStatefulSet(r.kubeClient, oldSts, newSts)
+       return err
+}
+
+// buildPodCacheKey builds a key for shuffle server pod queue.
+func buildPodCacheKey(namespace, rssName, shuffleServerKey string) string {
+       return namespace + "|" + rssName + "|" + shuffleServerKey
+}
+
+// parsePodCacheKey parses the key in shuffle server pod queue.
+func parsePodCacheKey(key string) (namespace, rssName, shuffleServerKey 
string) {
+       values := strings.Split(key, "|")
+       if len(values) == 3 {
+               namespace = values[0]
+               rssName = values[1]
+               shuffleServerKey = values[2]
+       }
+       return
+}
+
+// getNamesNeedToBeDeleted returns the names of shuffle server pods need to be 
deleted currently.
+func getNamesNeedToBeDeleted(targetNames []string, rss 
*unifflev1alpha1.RemoteShuffleService) []string {
+       deletedKeys := rss.Status.DeletedKeys
+       var waitingNames []string
+       for _, target := range targetNames {
+               found := false
+               for _, key := range deletedKeys {
+                       _, podName, _ := utils.ParseShuffleServerKey(key)
+                       if podName == target {
+                               found = true
+                               break
+                       }
+               }
+               if !found {
+                       waitingNames = append(waitingNames, target)
+               }
+       }
+       return waitingNames
+}
+
+// getNamesNeedToBeUpgraded returns the names of shuffle server pods need to 
be upgraded currently.
+func getNamesNeedToBeUpgraded(rss *unifflev1alpha1.RemoteShuffleService) 
[]string {
+       specificNames := 
sets.NewString(rss.Spec.ShuffleServer.UpgradeStrategy.SpecificNames...).List()
+       return getNamesNeedToBeDeleted(specificNames, rss)
+}
+
+// getNamesNeedToBeRestarted returns the names of shuffle server pods need to 
be restarted currently.
+func getNamesNeedToBeRestarted(podLister corelisters.PodLister, rss 
*unifflev1alpha1.RemoteShuffleService) (
+       []string, error) {
+       pods, err := 
podLister.Pods(rss.Namespace).List(labels.SelectorFromSet(utils.GenerateShuffleServerLabels(rss)))
+       if err != nil {
+               klog.Errorf("get pods of rss (%v) failed: %v", 
utils.UniqueName(rss), err)
+               return nil, err
+       }
+       targetNames := sets.NewString()
+       for i := 0; i < len(pods); i++ {
+               targetNames.Insert(pods[i].Name)
+       }
+       return getNamesNeedToBeDeleted(targetNames.List(), rss), nil
+}
+
+func getAppName(obj interface{}) ([]string, error) {
+       pod, ok := obj.(*corev1.Pod)
+       if !ok {
+               return nil, fmt.Errorf("not a pod")
+       }
+       return []string{pod.Labels["app"]}, nil
+}
diff --git a/deploy/kubernetes/operator/pkg/controller/controller/rss_test.go 
b/deploy/kubernetes/operator/pkg/controller/controller/rss_test.go
new file mode 100644
index 00000000..a72c7c10
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/controller/rss_test.go
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+       "context"
+       "path/filepath"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       appsv1 "k8s.io/api/apps/v1"
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/util/wait"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/client-go/kubernetes/scheme"
+       "k8s.io/utils/pointer"
+       "sigs.k8s.io/controller-runtime/pkg/envtest"
+       logf "sigs.k8s.io/controller-runtime/pkg/log"
+       "sigs.k8s.io/controller-runtime/pkg/log/zap"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/sync/coordinator"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+var (
+       testEnv        *envtest.Environment
+       testKubeClient kubernetes.Interface
+       testRssClient  versioned.Interface
+       testCM         *corev1.ConfigMap
+       testRSS        *unifflev1alpha1.RemoteShuffleService
+       stopCtx        context.Context
+       ctxCancel      context.CancelFunc
+)
+
+func TestRssController(t *testing.T) {
+       RegisterFailHandler(Fail)
+       suiteCfg, reporterCfg := GinkgoConfiguration()
+       reporterCfg.VeryVerbose = true
+       reporterCfg.FullTrace = true
+       RunSpecs(t, "rss controller suite", suiteCfg, reporterCfg)
+}
+
+var _ = BeforeSuite(
+       func() {
+               logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), 
zap.UseDevMode(true)))
+               By("bootstrapping test environment")
+               testEnv = &envtest.Environment{
+                       CRDDirectoryPaths: []string{filepath.Join("../../..", 
"config", "crd", "bases")},
+               }
+               restConfig, err := testEnv.Start()
+               Expect(err).To(BeNil())
+               Expect(restConfig).ToNot(BeNil())
+
+               testCM, testRSS = initTestRss()
+
+               testKubeClient, err = kubernetes.NewForConfig(restConfig)
+               Expect(err).ToNot(HaveOccurred())
+               Expect(testKubeClient).ToNot(BeNil())
+
+               testRssClient, err = versioned.NewForConfig(restConfig)
+               Expect(err).ToNot(HaveOccurred())
+               Expect(testRssClient).ToNot(BeNil())
+
+               err = unifflev1alpha1.AddToScheme(scheme.Scheme)
+               Expect(err).NotTo(HaveOccurred())
+
+               // +kubebuilder:scaffold:scheme
+
+               cfg := &config.Config{
+                       Workers: 1,
+                       GenericConfig: utils.GenericConfig{
+                               RESTConfig: restConfig,
+                               KubeClient: testKubeClient,
+                               RSSClient:  testRssClient,
+                       },
+               }
+               rc := newRSSController(cfg)
+               stopCtx, ctxCancel = context.WithCancel(context.TODO())
+               go func() {
+                       err = rc.Start(stopCtx)
+                       Expect(err).ToNot(HaveOccurred())
+               }()
+       },
+)
+
+var _ = AfterSuite(func() {
+       By("stopping rss controller")
+       ctxCancel()
+       By("tearing down the test environment")
+       Expect(testEnv.Stop()).To(Succeed())
+})
+
+// At present, it is not possible to simulate the real operation of Workload 
through EnvTest.
+// TODO: more detailed tests will be added in the future.
+var _ = Describe("RssController", func() {
+       Context("Handle rss objects", func() {
+               It("Create a rss object", func() {
+                       By("Create test configMap")
+                       _, err := 
testKubeClient.CoreV1().ConfigMaps(testCM.Namespace).
+                               Create(context.TODO(), testCM, 
metav1.CreateOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+
+                       By("Create test rss object")
+                       _, err = 
testRssClient.UniffleV1alpha1().RemoteShuffleServices(corev1.NamespaceDefault).
+                               Create(context.TODO(), testRSS, 
metav1.CreateOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+
+                       By("Wait rss object running")
+                       err = wait.Poll(time.Second, time.Second*5, func() 
(bool, error) {
+                               curRss, getErr := 
testRssClient.UniffleV1alpha1().RemoteShuffleServices(testNamespace).
+                                       Get(context.TODO(), testRssName, 
metav1.GetOptions{})
+                               if getErr != nil {
+                                       return false, getErr
+                               }
+
+                               if curRss.Status.Phase != 
unifflev1alpha1.RSSRunning || *curRss.Spec.ShuffleServer.Sync {
+                                       return false, nil
+                               }
+                               return true, nil
+                       })
+                       Expect(err).ToNot(HaveOccurred())
+
+                       By("Check coordinator 0")
+                       coordinatorName0 := 
coordinator.GenerateNameByIndex(testRSS, 0)
+                       _, err = 
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+                               Get(context.TODO(), coordinatorName0, 
metav1.GetOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+
+                       By("Check coordinator 1")
+                       coordinatorName1 := 
coordinator.GenerateNameByIndex(testRSS, 1)
+                       _, err = 
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+                               Get(context.TODO(), coordinatorName1, 
metav1.GetOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+
+                       By("Check shuffle server")
+                       shuffleServerName := shuffleserver.GenerateName(testRSS)
+                       _, err = 
testKubeClient.AppsV1().StatefulSets(corev1.NamespaceDefault).
+                               Get(context.TODO(), shuffleServerName, 
metav1.GetOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+               })
+               It("Update the rss object", func() {
+                       By("Get current rss object")
+                       curRSS, err := 
testRssClient.UniffleV1alpha1().RemoteShuffleServices(corev1.NamespaceDefault).
+                               Get(context.TODO(), testRssName, 
metav1.GetOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(curRSS).ToNot(BeNil())
+                       Expect(*curRSS.Spec.ShuffleServer.Sync, false)
+
+                       shuffleServerName := shuffleserver.GenerateName(testRSS)
+                       var sts *appsv1.StatefulSet
+                       sts, err = 
testKubeClient.AppsV1().StatefulSets(corev1.NamespaceDefault).
+                               Get(context.TODO(), shuffleServerName, 
metav1.GetOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(sts).ToNot(BeNil())
+                       Expect(*sts.Spec.Replicas).To(Equal(int32(1)))
+
+                       coordinatorName0 := 
coordinator.GenerateNameByIndex(testRSS, 0)
+                       var coordinator0 *appsv1.Deployment
+                       coordinator0, err = 
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+                               Get(context.TODO(), coordinatorName0, 
metav1.GetOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(coordinator0).ToNot(BeNil())
+                       
Expect(coordinator0.Spec.Template.Spec.Containers[0].Image).To(Equal(testCoordinatorImage1))
+
+                       coordinatorName1 := 
coordinator.GenerateNameByIndex(testRSS, 0)
+                       var coordinator1 *appsv1.Deployment
+                       coordinator1, err = 
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+                               Get(context.TODO(), coordinatorName1, 
metav1.GetOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(coordinator1).ToNot(BeNil())
+                       
Expect(coordinator1.Spec.Template.Spec.Containers[0].Image).To(Equal(testCoordinatorImage1))
+
+                       By("Update test rss object")
+                       curRSS.Spec.Coordinator.Image = testCoordinatorImage2
+                       curRSS.Spec.ShuffleServer.Sync = pointer.Bool(true)
+                       curRSS.Spec.ShuffleServer.Replicas = pointer.Int32(3)
+                       _, err = 
testRssClient.UniffleV1alpha1().RemoteShuffleServices(corev1.NamespaceDefault).
+                               Update(context.TODO(), curRSS, 
metav1.UpdateOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+
+                       By("Wait rss object upgrading")
+                       err = wait.Poll(time.Second, time.Second*5, func() 
(bool, error) {
+                               curRss, getErr := 
testRssClient.UniffleV1alpha1().RemoteShuffleServices(testNamespace).
+                                       Get(context.TODO(), testRssName, 
metav1.GetOptions{})
+                               if getErr != nil {
+                                       return false, getErr
+                               }
+                               if curRss.Status.Phase != 
unifflev1alpha1.RSSUpgrading {
+                                       return false, nil
+                               }
+                               return true, nil
+                       })
+                       Expect(err).ToNot(HaveOccurred())
+
+                       By("Check coordinator 0")
+                       coordinator0, err = 
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+                               Get(context.TODO(), coordinatorName0, 
metav1.GetOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(coordinator0).ToNot(BeNil())
+                       
Expect(coordinator0.Spec.Template.Spec.Containers[0].Image).To(Equal(testCoordinatorImage2))
+
+                       By("Check coordinator 1")
+                       coordinator1, err = 
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+                               Get(context.TODO(), coordinatorName1, 
metav1.GetOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(coordinator1).ToNot(BeNil())
+                       
Expect(coordinator1.Spec.Template.Spec.Containers[0].Image).To(Equal(testCoordinatorImage2))
+
+                       By("Check shuffle server")
+                       sts, err = 
testKubeClient.AppsV1().StatefulSets(corev1.NamespaceDefault).
+                               Get(context.TODO(), shuffleServerName, 
metav1.GetOptions{})
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(sts).ToNot(BeNil())
+                       Expect(*sts.Spec.Replicas).To(Equal(int32(3)))
+               })
+       })
+})
+
+func initTestRss() (*corev1.ConfigMap, *unifflev1alpha1.RemoteShuffleService) {
+       cm := &corev1.ConfigMap{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      testRssName,
+                       Namespace: testNamespace,
+               },
+               Data: map[string]string{
+                       constants.CoordinatorConfigKey:   "",
+                       constants.ShuffleServerConfigKey: "",
+                       constants.Log4jPropertiesKey:     "",
+               },
+       }
+       rss := &unifflev1alpha1.RemoteShuffleService{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      testRssName,
+                       Namespace: testNamespace,
+               },
+               Spec: unifflev1alpha1.RemoteShuffleServiceSpec{
+                       ConfigMapName: testRssName,
+                       Coordinator: &unifflev1alpha1.CoordinatorConfig{
+                               HTTPNodePort: []int32{30001, 30011},
+                               RPCNodePort:  []int32{30002, 30012},
+                               CommonConfig: &unifflev1alpha1.CommonConfig{
+                                       ConfigDir: "/app/config",
+                                       RSSPodSpec: &unifflev1alpha1.RSSPodSpec{
+                                               MainContainer: 
&unifflev1alpha1.MainContainer{
+                                                       Image: 
testCoordinatorImage1,
+                                               },
+                                       },
+                               },
+                       },
+                       ShuffleServer: &unifflev1alpha1.ShuffleServerConfig{
+                               CommonConfig: &unifflev1alpha1.CommonConfig{
+                                       ConfigDir: "/app/config",
+                                       RSSPodSpec: &unifflev1alpha1.RSSPodSpec{
+                                               MainContainer: 
&unifflev1alpha1.MainContainer{
+                                                       Image: 
"rss-shuffleserver:latest",
+                                               },
+                                       },
+                                       XmxSize: "10G",
+                               },
+                               UpgradeStrategy: 
&unifflev1alpha1.ShuffleServerUpgradeStrategy{
+                                       Type: unifflev1alpha1.FullUpgrade,
+                               },
+                       },
+               },
+       }
+       return cm, rss
+}
diff --git 
a/deploy/kubernetes/operator/pkg/controller/controller/shuffle_server_test.go 
b/deploy/kubernetes/operator/pkg/controller/controller/shuffle_server_test.go
new file mode 100644
index 00000000..85cb5c98
--- /dev/null
+++ 
b/deploy/kubernetes/operator/pkg/controller/controller/shuffle_server_test.go
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+       "context"
+       "sort"
+       "testing"
+
+       appsv1 "k8s.io/api/apps/v1"
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       kubefake "k8s.io/client-go/kubernetes/fake"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned/fake"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+func buildUpgradingRssObjWithTargetKeys(shuffleServerKey string) 
*unifflev1alpha1.RemoteShuffleService {
+       return &unifflev1alpha1.RemoteShuffleService{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:            testRssName,
+                       Namespace:       testNamespace,
+                       ResourceVersion: "test",
+               },
+               Spec: unifflev1alpha1.RemoteShuffleServiceSpec{
+                       Coordinator: &unifflev1alpha1.CoordinatorConfig{
+                               ExcludeNodesFilePath: "/exclude_nodes",
+                       },
+               },
+               Status: unifflev1alpha1.RemoteShuffleServiceStatus{
+                       Phase:      unifflev1alpha1.RSSUpgrading,
+                       TargetKeys: []string{shuffleServerKey},
+               },
+       }
+}
+
+func buildTestShuffleServerPod() *corev1.Pod {
+       return &corev1.Pod{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      testShuffleServerPodName,
+                       Namespace: testNamespace,
+                       Annotations: map[string]string{
+                               constants.AnnotationShuffleServerPort: "8080",
+                       },
+                       Labels: map[string]string{
+                               appsv1.ControllerRevisionHashLabelKey: 
"test-revision-1",
+                       },
+               },
+               Status: corev1.PodStatus{
+                       PodIP: "10.0.0.1",
+               },
+       }
+}
+
+func buildTestExcludeNodeConfigMap(rss *unifflev1alpha1.RemoteShuffleService,
+       shuffleServerKey string) *corev1.ConfigMap {
+       return &corev1.ConfigMap{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      utils.GenerateCoordinatorName(rss),
+                       Namespace: testNamespace,
+               },
+               Data: map[string]string{
+                       testExcludeNodeFileKey: shuffleServerKey,
+               },
+       }
+}
+
+func TestUpdateTargetAndDeletedKeys(t *testing.T) {
+       testShuffleServerPod := buildTestShuffleServerPod()
+       shuffleServerKey := utils.BuildShuffleServerKey(testShuffleServerPod)
+       rss := buildUpgradingRssObjWithTargetKeys(shuffleServerKey)
+       cm := buildTestExcludeNodeConfigMap(rss, shuffleServerKey)
+
+       rssClient := fake.NewSimpleClientset(rss)
+       kubeClient := kubefake.NewSimpleClientset(cm)
+
+       rc := newRSSController(&config.Config{
+               GenericConfig: utils.GenericConfig{
+                       KubeClient: kubeClient,
+                       RSSClient:  rssClient,
+               },
+       })
+       if err := rc.cmInformer.GetIndexer().Add(cm); err != nil {
+               t.Fatalf("update fake cm lister failed: %v", err)
+       }
+
+       for _, tt := range []struct {
+               name                 string
+               expectedDeletedKeys  []string
+               expectedExcludeNodes string
+       }{
+               {
+                       name:                 "update target and deleted keys 
when a shuffle server pod has been deleted",
+                       expectedDeletedKeys:  []string{shuffleServerKey},
+                       expectedExcludeNodes: "",
+               },
+       } {
+               t.Run(tt.name, func(tc *testing.T) {
+                       if err := rc.updateTargetAndDeletedKeys(rss, 
shuffleServerKey); err != nil {
+                               t.Errorf("update target and deleted keys 
failed: %v", err)
+                               return
+                       }
+
+                       curRss, err := 
rssClient.UniffleV1alpha1().RemoteShuffleServices(rss.Namespace).Get(context.TODO(),
+                               rss.Name, metav1.GetOptions{})
+                       if err != nil {
+                               t.Errorf("get updated rss object failed: %v", 
err)
+                               return
+                       }
+                       if !stringSliceIsEqual(curRss.Status.DeletedKeys, 
tt.expectedDeletedKeys) {
+                               t.Errorf("unexpected deleted keys: %+v, 
expected: %+v", curRss.Status.DeletedKeys,
+                                       tt.expectedDeletedKeys)
+                               return
+                       }
+
+                       var curCM *corev1.ConfigMap
+                       curCM, err = 
kubeClient.CoreV1().ConfigMaps(cm.Namespace).Get(context.TODO(), cm.Name, 
metav1.GetOptions{})
+                       if err != nil {
+                               t.Errorf("get updated rss object failed: %v", 
err)
+                               return
+                       }
+                       if curCM.Data[testExcludeNodeFileKey] != 
tt.expectedExcludeNodes {
+                               t.Errorf("unexpected deleted keys: %v, 
expected: %v", curCM.Data[testExcludeNodeFileKey],
+                                       tt.expectedExcludeNodes)
+                               return
+                       }
+               })
+       }
+}
+
+func stringSliceIsEqual(current []string, target []string) bool {
+       if len(current) != len(target) {
+               return false
+       }
+       sort.Strings(current)
+       sort.Strings(target)
+       for i := range current {
+               if current[i] != target[i] {
+                       return false
+               }
+       }
+       return true
+}
diff --git a/deploy/kubernetes/operator/pkg/controller/controller/test.go 
b/deploy/kubernetes/operator/pkg/controller/controller/test.go
new file mode 100644
index 00000000..3058176e
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/controller/test.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+       corev1 "k8s.io/api/core/v1"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+)
+
+const (
+       testRssName           = "test"
+       testNamespace         = corev1.NamespaceDefault
+       testCoordinatorImage1 = "rss-coordinator:test1"
+       testCoordinatorImage2 = "rss-coordinator:test2"
+
+       testShuffleServerPodName = constants.RSSShuffleServer + "-" + 
testRssName + "-0"
+       testExcludeNodeFileKey   = "exclude_nodes"
+)
diff --git 
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go 
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
new file mode 100644
index 00000000..d31aba65
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package coordinator
+
+import (
+       "fmt"
+       "strconv"
+       "strings"
+
+       appsv1 "k8s.io/api/apps/v1"
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/util/intstr"
+       "k8s.io/apimachinery/pkg/util/sets"
+       "k8s.io/utils/pointer"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       controllerconstants 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/util"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+var defaultENVs sets.String
+
+func init() {
+       defaultENVs = sets.NewString()
+       defaultENVs.Insert(controllerconstants.CoordinatorRPCPortEnv,
+               controllerconstants.CoordinatorHTTPPortEnv,
+               controllerconstants.XmxSizeEnv,
+               controllerconstants.ServiceNameEnv)
+}
+
+// GenerateCoordinators generates objects related to coordinators
+func GenerateCoordinators(rss *unifflev1alpha1.RemoteShuffleService) (
+       *corev1.ServiceAccount, *corev1.ConfigMap, []*corev1.Service, 
[]*appsv1.Deployment) {
+       sa := GenerateSA(rss)
+       cm := GenerateCM(rss)
+       count := *rss.Spec.Coordinator.Count
+       services := make([]*corev1.Service, count)
+       deployments := make([]*appsv1.Deployment, count)
+       for i := 0; i < int(count); i++ {
+               svc := GenerateSvc(rss, i)
+               deploy := GenerateDeploy(rss, i)
+               services[i] = svc
+               deployments[i] = deploy
+       }
+       return sa, cm, services, deployments
+}
+
+// GenerateSA generates service account of coordinator.
+func GenerateSA(rss *unifflev1alpha1.RemoteShuffleService) 
*corev1.ServiceAccount {
+       sa := &corev1.ServiceAccount{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      utils.GenerateCoordinatorName(rss),
+                       Namespace: rss.Namespace,
+               },
+       }
+       util.AddOwnerReference(&sa.ObjectMeta, rss)
+       return sa
+}
+
+// GenerateCM generates configMap used by coordinators.
+func GenerateCM(rss *unifflev1alpha1.RemoteShuffleService) *corev1.ConfigMap {
+       cm := &corev1.ConfigMap{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      utils.GenerateCoordinatorName(rss),
+                       Namespace: rss.Namespace,
+                       Labels: map[string]string{
+                               constants.LabelCoordinator: "true",
+                       },
+               },
+               Data: map[string]string{
+                       utils.GetExcludeNodesConfigMapKey(rss): "",
+               },
+       }
+       util.AddOwnerReference(&cm.ObjectMeta, rss)
+       return cm
+}
+
+// GenerateSvc generates service used by specific coordinator.
+func GenerateSvc(rss *unifflev1alpha1.RemoteShuffleService, index int) 
*corev1.Service {
+       name := GenerateNameByIndex(rss, index)
+       svc := &corev1.Service{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      name,
+                       Namespace: rss.Namespace,
+               },
+               Spec: corev1.ServiceSpec{
+                       Type: corev1.ServiceTypeNodePort,
+                       Selector: map[string]string{
+                               "app": name,
+                       },
+                       Ports: []corev1.ServicePort{
+                               {
+                                       Name:       "rpc",
+                                       Protocol:   corev1.ProtocolTCP,
+                                       Port:       
controllerconstants.ContainerCoordinatorRPCPort,
+                                       TargetPort: 
intstr.FromInt(int(*rss.Spec.Coordinator.RPCPort)),
+                                       NodePort:   
rss.Spec.Coordinator.RPCNodePort[index],
+                               },
+                               {
+                                       Name:       "http",
+                                       Protocol:   corev1.ProtocolTCP,
+                                       Port:       
controllerconstants.ContainerCoordinatorHTTPPort,
+                                       TargetPort: 
intstr.FromInt(int(*rss.Spec.Coordinator.HTTPPort)),
+                                       NodePort:   
rss.Spec.Coordinator.HTTPNodePort[index],
+                               },
+                       },
+               },
+       }
+       util.AddOwnerReference(&svc.ObjectMeta, rss)
+       return svc
+}
+
+// GenerateDeploy generates deployment of specific coordinator.
+func GenerateDeploy(rss *unifflev1alpha1.RemoteShuffleService, index int) 
*appsv1.Deployment {
+       name := GenerateNameByIndex(rss, index)
+
+       podSpec := corev1.PodSpec{
+               HostNetwork:        *rss.Spec.Coordinator.HostNetwork,
+               ServiceAccountName: utils.GenerateCoordinatorName(rss),
+               Tolerations: []corev1.Toleration{
+                       {
+                               Effect: corev1.TaintEffectNoSchedule,
+                               Key:    "node-role.kubernetes.io/master",
+                       },
+               },
+               Volumes: []corev1.Volume{
+                       {
+                               Name: 
controllerconstants.ConfigurationVolumeName,
+                               VolumeSource: corev1.VolumeSource{
+                                       ConfigMap: 
&corev1.ConfigMapVolumeSource{
+                                               LocalObjectReference: 
corev1.LocalObjectReference{
+                                                       Name: 
rss.Spec.ConfigMapName,
+                                               },
+                                               DefaultMode: 
pointer.Int32(0777),
+                                       },
+                               },
+                       },
+               },
+               NodeSelector: rss.Spec.Coordinator.NodeSelector,
+       }
+       if podSpec.HostNetwork {
+               podSpec.DNSPolicy = corev1.DNSClusterFirstWithHostNet
+       }
+
+       deploy := &appsv1.Deployment{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      name,
+                       Namespace: rss.Namespace,
+                       Labels: map[string]string{
+                               "app": name,
+                       },
+               },
+               Spec: appsv1.DeploymentSpec{
+                       Selector: &metav1.LabelSelector{
+                               MatchLabels: map[string]string{
+                                       "app": name,
+                               },
+                       },
+                       Replicas: rss.Spec.Coordinator.Replicas,
+                       Template: corev1.PodTemplateSpec{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Labels: map[string]string{
+                                               "app": name,
+                                       },
+                               },
+                               Spec: podSpec,
+                       },
+               },
+       }
+
+       // add init containers, the main container and other containers.
+       deploy.Spec.Template.Spec.InitContainers = 
util.GenerateInitContainers(rss.Spec.Coordinator.RSSPodSpec)
+       containers := []corev1.Container{*generateMainContainer(rss)}
+       containers = append(containers, 
rss.Spec.Coordinator.SidecarContainers...)
+       deploy.Spec.Template.Spec.Containers = containers
+
+       // add configMap volume to save exclude nodes.
+       configMapVolume := corev1.Volume{
+               Name: controllerconstants.ExcludeNodesFile,
+               VolumeSource: corev1.VolumeSource{
+                       ConfigMap: &corev1.ConfigMapVolumeSource{
+                               LocalObjectReference: 
corev1.LocalObjectReference{
+                                       Name: 
utils.GenerateCoordinatorName(rss),
+                               },
+                               DefaultMode: pointer.Int32(0777),
+                       },
+               },
+       }
+       deploy.Spec.Template.Spec.Volumes = 
append(deploy.Spec.Template.Spec.Volumes, configMapVolume)
+       // add hostPath volumes for coordinators.
+       hostPathMounts := rss.Spec.Coordinator.HostPathMounts
+       logHostPath := rss.Spec.Coordinator.LogHostPath
+       deploy.Spec.Template.Spec.Volumes = 
append(deploy.Spec.Template.Spec.Volumes,
+               util.GenerateHostPathVolumes(hostPathMounts, logHostPath, 
name)...)
+
+       util.AddOwnerReference(&deploy.ObjectMeta, rss)
+       return deploy
+}
+
+// GenerateNameByIndex returns workload or service name of coordinator by 
index.
+func GenerateNameByIndex(rss *unifflev1alpha1.RemoteShuffleService, index int) 
string {
+       return fmt.Sprintf("%v-%v-%v", constants.RSSCoordinator, rss.Name, 
index)
+}
+
+// GenerateAddresses returns addresses of coordinators accessed by shuffle 
servers.
+func GenerateAddresses(rss *unifflev1alpha1.RemoteShuffleService) string {
+       var names []string
+       for i := 0; i < int(*rss.Spec.Coordinator.Count); i++ {
+               current := fmt.Sprintf("%v:%v", GenerateNameByIndex(rss, i),
+                       controllerconstants.ContainerShuffleServerRPCPort)
+               names = append(names, current)
+       }
+       return strings.Join(names, ",")
+}
+
+// GenerateProperties generates configuration properties of coordinators.
+func GenerateProperties(rss *unifflev1alpha1.RemoteShuffleService) 
map[controllerconstants.PropertyKey]string {
+       result := make(map[controllerconstants.PropertyKey]string)
+       result[controllerconstants.RPCServerPort] = fmt.Sprintf("%v", 
*rss.Spec.Coordinator.RPCPort)
+       result[controllerconstants.JettyHTTPPort] = fmt.Sprintf("%v", 
*rss.Spec.Coordinator.HTTPPort)
+       result[controllerconstants.CoordinatorExcludeNodesPath] = 
utils.GetExcludeNodesMountPath(rss)
+       return result
+}
+
+// generateMainContainer generates main container of coordinators.
+func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService) 
*corev1.Container {
+       return util.GenerateMainContainer(constants.RSSCoordinator, 
rss.Spec.Coordinator.ConfigDir,
+               rss.Spec.Coordinator.RSSPodSpec.DeepCopy(), 
generateMainContainerPorts(rss),
+               generateMainContainerENV(rss), []corev1.VolumeMount{
+                       {
+                               Name:      controllerconstants.ExcludeNodesFile,
+                               MountPath: utils.GetExcludeNodesMountPath(rss),
+                       },
+               })
+}
+
+// generateMainContainerPorts generates ports of main container of 
coordinators.
+func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) 
[]corev1.ContainerPort {
+       ports := []corev1.ContainerPort{
+               {
+                       ContainerPort: 
controllerconstants.ContainerCoordinatorRPCPort,
+                       Protocol:      corev1.ProtocolTCP,
+               },
+               {
+                       ContainerPort: 
controllerconstants.ContainerCoordinatorHTTPPort,
+                       Protocol:      corev1.ProtocolTCP,
+               },
+       }
+       ports = append(ports, rss.Spec.Coordinator.Ports...)
+       return ports
+}
+
+// generateMainContainerENV generates environment variables of main container 
of coordinators.
+func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService) 
[]corev1.EnvVar {
+       env := []corev1.EnvVar{
+               {
+                       Name:  controllerconstants.CoordinatorRPCPortEnv,
+                       Value: 
strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10),
+               },
+               {
+                       Name:  controllerconstants.CoordinatorHTTPPortEnv,
+                       Value: 
strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10),
+               },
+               {
+                       Name:  controllerconstants.XmxSizeEnv,
+                       Value: rss.Spec.Coordinator.XmxSize,
+               },
+               {
+                       Name:  controllerconstants.ServiceNameEnv,
+                       Value: controllerconstants.CoordinatorServiceName,
+               },
+       }
+       for _, e := range rss.Spec.Coordinator.Env {
+               if !defaultENVs.Has(e.Name) {
+                       env = append(env, e)
+               }
+       }
+       return env
+}
diff --git 
a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go 
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
new file mode 100644
index 00000000..9a65842d
--- /dev/null
+++ 
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package shuffleserver
+
+import (
+       "fmt"
+       "sort"
+       "strconv"
+       "strings"
+
+       appsv1 "k8s.io/api/apps/v1"
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/util/intstr"
+       "k8s.io/utils/pointer"
+
+       unifflev1alpha1 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       controllerconstants 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/sync/coordinator"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/util"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// GenerateShuffleServers generates objects related to shuffle servers.
+func GenerateShuffleServers(rss *unifflev1alpha1.RemoteShuffleService) (
+       *corev1.ServiceAccount, []*corev1.Service, *appsv1.StatefulSet) {
+       sa := GenerateSA(rss)
+       var services []*corev1.Service
+       if needGenerateHeadlessSVC(rss) {
+               services = append(services, GenerateHeadlessSVC(rss))
+       }
+       if needGenerateNodePortSVC(rss) {
+               services = append(services, GenerateNodePortSVC(rss))
+       }
+       sts := GenerateSts(rss)
+       return sa, services, sts
+}
+
+// GenerateSA generates service account of shuffle servers.
+func GenerateSA(rss *unifflev1alpha1.RemoteShuffleService) 
*corev1.ServiceAccount {
+       sa := &corev1.ServiceAccount{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      GenerateName(rss),
+                       Namespace: rss.Namespace,
+               },
+       }
+       util.AddOwnerReference(&sa.ObjectMeta, rss)
+       return sa
+}
+
+// GenerateHeadlessSVC generates headless service used by shuffle servers.
+func GenerateHeadlessSVC(rss *unifflev1alpha1.RemoteShuffleService) 
*corev1.Service {
+       name := generateHeadlessSVCName(rss)
+       svc := &corev1.Service{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      name,
+                       Namespace: rss.Namespace,
+               },
+               Spec: corev1.ServiceSpec{
+                       ClusterIP: corev1.ClusterIPNone,
+                       Selector: map[string]string{
+                               "app": GenerateName(rss),
+                       },
+               },
+       }
+       if rss.Spec.ShuffleServer.RPCPort != nil {
+               svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+                       Name:       "rpc",
+                       Protocol:   corev1.ProtocolTCP,
+                       Port:       
controllerconstants.ContainerShuffleServerRPCPort,
+                       TargetPort: 
intstr.FromInt(int(*rss.Spec.ShuffleServer.RPCPort)),
+               })
+       }
+       if rss.Spec.ShuffleServer.HTTPPort != nil {
+               svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+                       Name:       "http",
+                       Protocol:   corev1.ProtocolTCP,
+                       Port:       
controllerconstants.ContainerShuffleServerHTTPPort,
+                       TargetPort: 
intstr.FromInt(int(*rss.Spec.ShuffleServer.HTTPPort)),
+               })
+       }
+       util.AddOwnerReference(&svc.ObjectMeta, rss)
+       return svc
+}
+
+// GenerateNodePortSVC generates nodePort service used by shuffle servers.
+func GenerateNodePortSVC(rss *unifflev1alpha1.RemoteShuffleService) 
*corev1.Service {
+       name := GenerateName(rss)
+       svc := &corev1.Service{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      name,
+                       Namespace: rss.Namespace,
+               },
+               Spec: corev1.ServiceSpec{
+                       Type: corev1.ServiceTypeNodePort,
+                       Selector: map[string]string{
+                               "app": name,
+                       },
+               },
+       }
+       if needNodePortForRPC(rss) {
+               svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+                       Protocol:   corev1.ProtocolTCP,
+                       Port:       
controllerconstants.ContainerShuffleServerRPCPort,
+                       TargetPort: 
intstr.FromInt(int(*rss.Spec.ShuffleServer.RPCPort)),
+                       NodePort:   *rss.Spec.ShuffleServer.RPCNodePort,
+               })
+       }
+       if needNodePortForHTTP(rss) {
+               svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+                       Protocol:   corev1.ProtocolTCP,
+                       Port:       
controllerconstants.ContainerShuffleServerHTTPPort,
+                       TargetPort: 
intstr.FromInt(int(*rss.Spec.ShuffleServer.HTTPPort)),
+                       NodePort:   *rss.Spec.ShuffleServer.HTTPNodePort,
+               })
+       }
+       util.AddOwnerReference(&svc.ObjectMeta, rss)
+       return svc
+}
+
+// getReplicas returns replicas of shuffle servers.
+func getReplicas(rss *unifflev1alpha1.RemoteShuffleService) *int32 {
+       // TODO: we will support hpa for rss object, and when we enable hpa, wo 
should not return replicas in .spec.shuffleServer field.
+       return rss.Spec.ShuffleServer.Replicas
+}
+
+// GenerateSts generates statefulSet of shuffle servers.
+func GenerateSts(rss *unifflev1alpha1.RemoteShuffleService) 
*appsv1.StatefulSet {
+       name := GenerateName(rss)
+       replicas := getReplicas(rss)
+
+       podSpec := corev1.PodSpec{
+               SecurityContext:    rss.Spec.ShuffleServer.SecurityContext,
+               HostNetwork:        *rss.Spec.ShuffleServer.HostNetwork,
+               ServiceAccountName: GenerateName(rss),
+               Tolerations: []corev1.Toleration{
+                       {
+                               Effect: corev1.TaintEffectNoSchedule,
+                               Key:    "node-role.kubernetes.io/master",
+                       },
+               },
+               Volumes: []corev1.Volume{
+                       {
+                               Name: 
controllerconstants.ConfigurationVolumeName,
+                               VolumeSource: corev1.VolumeSource{
+                                       ConfigMap: 
&corev1.ConfigMapVolumeSource{
+                                               LocalObjectReference: 
corev1.LocalObjectReference{
+                                                       Name: 
rss.Spec.ConfigMapName,
+                                               },
+                                               DefaultMode: 
pointer.Int32(0777),
+                                       },
+                               },
+                       },
+               },
+               NodeSelector: rss.Spec.ShuffleServer.NodeSelector,
+       }
+       if podSpec.HostNetwork {
+               podSpec.DNSPolicy = corev1.DNSClusterFirstWithHostNet
+       }
+
+       sts := &appsv1.StatefulSet{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      name,
+                       Namespace: rss.Namespace,
+                       Labels:    utils.GenerateShuffleServerLabels(rss),
+               },
+               Spec: appsv1.StatefulSetSpec{
+                       Selector: &metav1.LabelSelector{
+                               MatchLabels: 
utils.GenerateShuffleServerLabels(rss),
+                       },
+                       UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
+                               Type: 
appsv1.RollingUpdateStatefulSetStrategyType,
+                               RollingUpdate: 
&appsv1.RollingUpdateStatefulSetStrategy{
+                                       Partition: replicas,
+                               },
+                       },
+                       ServiceName: generateHeadlessSVCName(rss),
+                       Replicas:    replicas,
+                       Template: corev1.PodTemplateSpec{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Labels: 
utils.GenerateShuffleServerLabels(rss),
+                                       Annotations: map[string]string{
+                                               constants.AnnotationRssName: 
rss.Name,
+                                               constants.AnnotationRssUID:  
string(rss.UID),
+                                               
constants.AnnotationMetricsServerPort: fmt.Sprintf("%v",
+                                                       
controllerconstants.ContainerShuffleServerHTTPPort),
+                                               
constants.AnnotationShuffleServerPort: fmt.Sprintf("%v",
+                                                       
controllerconstants.ContainerShuffleServerRPCPort),
+                                       },
+                               },
+                               Spec: podSpec,
+                       },
+               },
+       }
+
+       // add init containers, the main container and other containers.
+       sts.Spec.Template.Spec.InitContainers = 
util.GenerateInitContainers(rss.Spec.ShuffleServer.RSSPodSpec)
+       containers := []corev1.Container{*generateMainContainer(rss)}
+       containers = append(containers, 
rss.Spec.ShuffleServer.SidecarContainers...)
+       sts.Spec.Template.Spec.Containers = containers
+
+       // add hostPath volumes for shuffle servers.
+       hostPathMounts := rss.Spec.ShuffleServer.HostPathMounts
+       logHostPath := rss.Spec.ShuffleServer.LogHostPath
+       sts.Spec.Template.Spec.Volumes = append(sts.Spec.Template.Spec.Volumes,
+               util.GenerateHostPathVolumes(hostPathMounts, logHostPath, 
name)...)
+
+       util.AddOwnerReference(&sts.ObjectMeta, rss)
+       return sts
+}
+
+// GenerateName returns workload or nodePort service name of shuffle server.
+func GenerateName(rss *unifflev1alpha1.RemoteShuffleService) string {
+       return utils.GenerateShuffleServerName(rss)
+}
+
+// GenerateProperties generates configuration properties of shuffle servers.
+func GenerateProperties(rss *unifflev1alpha1.RemoteShuffleService) 
map[controllerconstants.PropertyKey]string {
+       result := make(map[controllerconstants.PropertyKey]string)
+       result[controllerconstants.RPCServerPort] = fmt.Sprintf("%v", 
*rss.Spec.ShuffleServer.RPCPort)
+       result[controllerconstants.JettyHTTPPort] = fmt.Sprintf("%v", 
*rss.Spec.ShuffleServer.HTTPPort)
+       result[controllerconstants.CoordinatorQuorum] = 
coordinator.GenerateAddresses(rss)
+       result[controllerconstants.StorageBasePath] = 
generateStorageBasePath(rss)
+       return result
+}
+
+// generateStorageBasePath generates storage base path in shuffle server's 
configuration.
+func generateStorageBasePath(rss *unifflev1alpha1.RemoteShuffleService) string 
{
+       var paths []string
+       for k, v := range rss.Spec.ShuffleServer.HostPathMounts {
+               if k == rss.Spec.ShuffleServer.LogHostPath {
+                       continue
+               }
+               paths = append(paths, strings.TrimSuffix(v, "/")+"/rssdata")
+       }
+       sort.Strings(paths)
+       return strings.Join(paths, ",")
+}
+
+// generateHeadlessSVCName returns name of shuffle servers' headless service.
+func generateHeadlessSVCName(rss *unifflev1alpha1.RemoteShuffleService) string 
{
+       return GenerateName(rss) + "-headless"
+}
+
+// generateMainContainer generates main container of shuffle servers.
+func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService) 
*corev1.Container {
+       return util.GenerateMainContainer(constants.RSSShuffleServer, 
rss.Spec.ShuffleServer.ConfigDir,
+               rss.Spec.ShuffleServer.RSSPodSpec.DeepCopy(), 
generateMainContainerPorts(rss),
+               generateMainContainerENV(rss), nil)
+}
+
+// generateMainContainerPorts generates ports of main container of shuffle 
servers.
+func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) 
[]corev1.ContainerPort {
+       ports := []corev1.ContainerPort{
+               {
+                       ContainerPort: 
controllerconstants.ContainerShuffleServerRPCPort,
+                       Protocol:      corev1.ProtocolTCP,
+               },
+               {
+                       ContainerPort: 
controllerconstants.ContainerShuffleServerHTTPPort,
+                       Protocol:      corev1.ProtocolTCP,
+               },
+       }
+       ports = append(ports, rss.Spec.ShuffleServer.Ports...)
+       return ports
+}
+
+// generateMainContainerENV generates environment variables of main container 
of shuffle servers.
+func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService) 
[]corev1.EnvVar {
+       return []corev1.EnvVar{
+               {
+                       Name:  controllerconstants.ShuffleServerRPCPortEnv,
+                       Value: 
strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerRPCPort), 10),
+               },
+               {
+                       Name:  controllerconstants.ShuffleServerHTTPPortEnv,
+                       Value: 
strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerHTTPPort), 
10),
+               },
+               {
+                       Name:  controllerconstants.RSSCoordinatorQuorumEnv,
+                       Value: coordinator.GenerateAddresses(rss),
+               },
+               {
+                       Name:  controllerconstants.XmxSizeEnv,
+                       Value: rss.Spec.ShuffleServer.XmxSize,
+               },
+               {
+                       Name:  controllerconstants.ServiceNameEnv,
+                       Value: controllerconstants.ShuffleServerServiceName,
+               },
+               {
+                       Name: controllerconstants.NodeNameEnv,
+                       ValueFrom: &corev1.EnvVarSource{
+                               FieldRef: &corev1.ObjectFieldSelector{
+                                       APIVersion: "v1",
+                                       FieldPath:  "spec.nodeName",
+                               },
+                       },
+               },
+               {
+                       Name: controllerconstants.RssIPEnv,
+                       ValueFrom: &corev1.EnvVarSource{
+                               FieldRef: &corev1.ObjectFieldSelector{
+                                       APIVersion: "v1",
+                                       FieldPath:  "status.podIP",
+                               },
+                       },
+               },
+       }
+}
+
+// needGenerateNodePortSVC returns whether we need node port service for 
shuffle servers.
+func needGenerateNodePortSVC(rss *unifflev1alpha1.RemoteShuffleService) bool {
+       return needNodePortForRPC(rss) || needNodePortForHTTP(rss)
+}
+
+// needGenerateHeadlessSVC returns whether we need headless service for 
shuffle servers.
+func needGenerateHeadlessSVC(rss *unifflev1alpha1.RemoteShuffleService) bool {
+       return rss.Spec.ShuffleServer.RPCPort != nil || 
rss.Spec.ShuffleServer.HTTPPort != nil
+}
+
+// needNodePortForRPC returns whether we need node port service for rpc 
service of shuffle servers.
+func needNodePortForRPC(rss *unifflev1alpha1.RemoteShuffleService) bool {
+       return rss.Spec.ShuffleServer.RPCPort != nil && 
rss.Spec.ShuffleServer.RPCNodePort != nil
+}
+
+// needNodePortForRPC returns whether we need node port service for http 
service of shuffle servers.
+func needNodePortForHTTP(rss *unifflev1alpha1.RemoteShuffleService) bool {
+       return rss.Spec.ShuffleServer.HTTPPort != nil && 
rss.Spec.ShuffleServer.HTTPNodePort != nil
+}
diff --git 
a/deploy/kubernetes/operator/pkg/controller/util/kubernetes/configmap.go 
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/configmap.go
new file mode 100644
index 00000000..66a7c973
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/configmap.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kubernetes
+
+import (
+       "context"
+       "encoding/json"
+
+       corev1 "k8s.io/api/core/v1"
+       apierrors "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/types"
+       "k8s.io/apimachinery/pkg/util/strategicpatch"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/klog/v2"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// SyncConfigMap synchronizes configMap object.
+func SyncConfigMap(kubeClient kubernetes.Interface, cm *corev1.ConfigMap) 
error {
+       oldCM, err := kubeClient.CoreV1().ConfigMaps(cm.Namespace).
+               Get(context.Background(), cm.Name, metav1.GetOptions{})
+       if err != nil {
+               if !apierrors.IsNotFound(err) {
+                       klog.Errorf("get configMap (%v) failed: %v", 
utils.UniqueName(cm), err)
+                       return err
+               }
+               // try to create a new configMap.
+               if _, err = kubeClient.CoreV1().ConfigMaps(cm.Namespace).
+                       Create(context.Background(), cm, 
metav1.CreateOptions{}); err != nil {
+                       klog.Errorf("create configMap (%v) failed: %v", 
utils.UniqueName(cm), err)
+                       return err
+               }
+               return nil
+       }
+       // if it already exists, try to update it through patch method.
+       return PatchConfigMap(kubeClient, oldCM, cm)
+}
+
+// PatchConfigMap patches the old configMap to new configMap.
+func PatchConfigMap(kubeClient kubernetes.Interface,
+       oldCM, newCM *corev1.ConfigMap) error {
+       var oldData []byte
+       var err error
+       oldData, err = json.Marshal(oldCM)
+       if err != nil {
+               klog.Errorf("marshal oldCM (%+v) failed: %v", oldCM, err)
+               return err
+       }
+       var newData []byte
+       newData, err = json.Marshal(newCM)
+       if err != nil {
+               klog.Errorf("marshal newCM (%+v) failed: %v", newCM, err)
+               return err
+       }
+       // build payload for patch method.
+       var patchBytes []byte
+       patchBytes, err = strategicpatch.CreateTwoWayMergePatch(oldData, 
newData,
+               &corev1.ConfigMap{})
+       if err != nil {
+               klog.Errorf("created merge patch for configMap %v failed: %v", 
utils.UniqueName(oldCM), err)
+               return err
+       }
+       if _, err = 
kubeClient.CoreV1().ConfigMaps(oldCM.Namespace).Patch(context.Background(),
+               oldCM.Name, types.StrategicMergePatchType, patchBytes, 
metav1.PatchOptions{}); err != nil {
+               klog.Errorf("patch configMap (%v) failed: %v", 
utils.UniqueName(oldCM), err)
+               return err
+       }
+       return nil
+}
diff --git 
a/deploy/kubernetes/operator/pkg/controller/util/kubernetes/deployment.go 
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/deployment.go
new file mode 100644
index 00000000..f0bc7aae
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/deployment.go
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kubernetes
+
+import (
+       "context"
+       "encoding/json"
+
+       appsv1 "k8s.io/api/apps/v1"
+       apierrors "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/types"
+       "k8s.io/apimachinery/pkg/util/strategicpatch"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/klog/v2"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// SyncDeployments synchronizes deployment objects.
+func SyncDeployments(kubeClient kubernetes.Interface, deployments 
[]*appsv1.Deployment) error {
+       for i := range deployments {
+               if err := SyncDeployment(kubeClient, deployments[i]); err != 
nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+// SyncDeployment synchronizes deployment object.
+func SyncDeployment(kubeClient kubernetes.Interface, deploy 
*appsv1.Deployment) error {
+       oldDeploy, err := kubeClient.AppsV1().Deployments(deploy.Namespace).
+               Get(context.Background(), deploy.Name, metav1.GetOptions{})
+       if err != nil {
+               if !apierrors.IsNotFound(err) {
+                       klog.Errorf("get deployment (%v) failed: %v", 
utils.UniqueName(deploy), err)
+                       return err
+               }
+               // try to create a new deployment.
+               if _, err = kubeClient.AppsV1().Deployments(deploy.Namespace).
+                       Create(context.Background(), deploy, 
metav1.CreateOptions{}); err != nil {
+                       klog.Errorf("create deployment (%v) failed: %v", 
utils.UniqueName(deploy), err)
+                       return err
+               }
+               return nil
+       }
+       // if it already exists, try to update it through patch method.
+       return PatchDeployment(kubeClient, oldDeploy, deploy)
+}
+
+// PatchDeployment patches the old deployment to new deployment.
+func PatchDeployment(kubeClient kubernetes.Interface,
+       oldDeploy, newDeploy *appsv1.Deployment) error {
+       var oldData []byte
+       var err error
+       oldData, err = json.Marshal(oldDeploy)
+       if err != nil {
+               klog.Errorf("marshal oldDeploy (%+v) failed: %v", oldDeploy, 
err)
+               return err
+       }
+       var newData []byte
+       newData, err = json.Marshal(newDeploy)
+       if err != nil {
+               klog.Errorf("marshal newDeploy (%+v) failed: %v", newDeploy, 
err)
+               return err
+       }
+       // build payload for patch method.
+       var patchBytes []byte
+       patchBytes, err = strategicpatch.CreateTwoWayMergePatch(oldData, 
newData,
+               &appsv1.Deployment{})
+       if err != nil {
+               klog.Errorf("created merge patch for deployment %v failed: %v",
+                       utils.UniqueName(oldDeploy), err)
+               return err
+       }
+       if _, err = 
kubeClient.AppsV1().Deployments(oldDeploy.Namespace).Patch(context.Background(),
+               oldDeploy.Name, types.StrategicMergePatchType, patchBytes,
+               metav1.PatchOptions{}); err != nil {
+               klog.Errorf("patch deployment (%v) failed: %v", 
utils.UniqueName(oldDeploy), err)
+               return err
+       }
+       return nil
+}
diff --git 
a/deploy/kubernetes/operator/pkg/controller/util/kubernetes/service.go 
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/service.go
new file mode 100644
index 00000000..c3540226
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/service.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kubernetes
+
+import (
+       "context"
+       "encoding/json"
+
+       corev1 "k8s.io/api/core/v1"
+       apierrors "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/types"
+       "k8s.io/apimachinery/pkg/util/strategicpatch"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/klog/v2"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// SyncServices synchronizes service objects.
+func SyncServices(kubeClient kubernetes.Interface, services []*corev1.Service) 
error {
+       for i := range services {
+               if err := SyncService(kubeClient, services[i]); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+// SyncService synchronizes service object.
+func SyncService(kubeClient kubernetes.Interface, svc *corev1.Service) error {
+       oldSVC, err := kubeClient.CoreV1().Services(svc.Namespace).
+               Get(context.Background(), svc.Name, metav1.GetOptions{})
+       if err != nil {
+               if !apierrors.IsNotFound(err) {
+                       klog.Errorf("get service (%v) failed: %v", 
utils.UniqueName(svc), err)
+                       return err
+               }
+               // try to create a new service.
+               if _, err = kubeClient.CoreV1().Services(svc.Namespace).
+                       Create(context.Background(), svc, 
metav1.CreateOptions{}); err != nil {
+                       klog.Errorf("create service (%v) failed: %v", 
utils.UniqueName(svc), err)
+                       return err
+               }
+               return nil
+       }
+       // if it already exists, try to update it through patch method.
+       return PatchService(kubeClient, oldSVC, svc)
+}
+
+// mergeSVC merges old and new service, and generates a service to be updated.
+// Service objects are special, because we can only update the following 
fields and if we merge
+// original and new object directly, it may clear the '.spec.ClusterIP field', 
which is forbidden.
+func mergeSVC(oldSVC, newSVC *corev1.Service) *corev1.Service {
+       updatedSVC := oldSVC.DeepCopy()
+       updatedSVC.Labels = newSVC.Labels
+       updatedSVC.Annotations = newSVC.Annotations
+       updatedSVC.OwnerReferences = newSVC.OwnerReferences
+       updatedSVC.Spec.Type = newSVC.Spec.Type
+       updatedSVC.Spec.Selector = newSVC.Spec.Selector
+       updatedSVC.Spec.Ports = newSVC.Spec.Ports
+       return updatedSVC
+}
+
+// PatchService patches the old service to new service.
+func PatchService(kubeClient kubernetes.Interface,
+       oldSVC, newSVC *corev1.Service) error {
+       var oldData []byte
+       var err error
+       oldData, err = json.Marshal(oldSVC)
+       if err != nil {
+               klog.Errorf("marshal oldSVC (%+v) failed: %v", oldSVC, err)
+               return err
+       }
+       var newData []byte
+       newData, err = json.Marshal(mergeSVC(oldSVC, newSVC))
+       if err != nil {
+               klog.Errorf("marshal newSVC (%+v) failed: %v", newSVC, err)
+               return err
+       }
+       // build payload for patch method.
+       var patchBytes []byte
+       patchBytes, err = strategicpatch.CreateTwoWayMergePatch(oldData, 
newData,
+               &corev1.Service{})
+       if err != nil {
+               klog.Errorf("created merge patch for service %v failed: %v", 
utils.UniqueName(oldSVC), err)
+               return err
+       }
+       if _, err = 
kubeClient.CoreV1().Services(oldSVC.Namespace).Patch(context.Background(),
+               oldSVC.Name, types.StrategicMergePatchType, patchBytes, 
metav1.PatchOptions{}); err != nil {
+               klog.Errorf("patch service (%v) with (%v) failed: %v",
+                       utils.UniqueName(oldSVC), string(patchBytes), err)
+               return err
+       }
+       return nil
+}
diff --git 
a/deploy/kubernetes/operator/pkg/controller/util/kubernetes/serviceaccount.go 
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/serviceaccount.go
new file mode 100644
index 00000000..aa7b3315
--- /dev/null
+++ 
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/serviceaccount.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kubernetes
+
+import (
+       "context"
+       "encoding/json"
+
+       corev1 "k8s.io/api/core/v1"
+       apierrors "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/types"
+       "k8s.io/apimachinery/pkg/util/strategicpatch"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/klog/v2"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// SyncServiceAccount synchronizes serviceAccount object.
+func SyncServiceAccount(kubeClient kubernetes.Interface, sa 
*corev1.ServiceAccount) error {
+       oldSA, err := kubeClient.CoreV1().ServiceAccounts(sa.Namespace).
+               Get(context.Background(), sa.Name, metav1.GetOptions{})
+       if err != nil {
+               if !apierrors.IsNotFound(err) {
+                       klog.Errorf("get service account (%v) failed: %v", 
utils.UniqueName(sa), err)
+                       return err
+               }
+               // try to create a new serviceAccount.
+               if _, err = kubeClient.CoreV1().ServiceAccounts(sa.Namespace).
+                       Create(context.Background(), sa, 
metav1.CreateOptions{}); err != nil {
+                       klog.Errorf("create service account (%v) failed: %v", 
utils.UniqueName(sa), err)
+                       return err
+               }
+               return nil
+       }
+       // if it already exists, try to update it through patch method.
+       return PatchServiceAccount(kubeClient, oldSA, sa)
+}
+
+// PatchServiceAccount patches the old serviceAccount to new serviceAccount.
+func PatchServiceAccount(kubeClient kubernetes.Interface,
+       oldSA, newSA *corev1.ServiceAccount) error {
+       var oldData []byte
+       var err error
+       oldData, err = json.Marshal(oldSA)
+       if err != nil {
+               klog.Errorf("marshal oldSA (%+v) failed: %v", oldSA, err)
+               return err
+       }
+       var newData []byte
+       newData, err = json.Marshal(newSA)
+       if err != nil {
+               klog.Errorf("marshal newSA (%+v) failed: %v", newSA, err)
+               return err
+       }
+       // build payload for patch method.
+       var patchBytes []byte
+       patchBytes, err = strategicpatch.CreateTwoWayMergePatch(oldData, 
newData,
+               &corev1.ServiceAccount{})
+       if err != nil {
+               klog.Errorf("created merge patch for serviceAccount %v failed: 
%v", utils.UniqueName(oldSA),
+                       err)
+               return err
+       }
+       if _, err = 
kubeClient.CoreV1().ServiceAccounts(oldSA.Namespace).Patch(context.Background(),
+               oldSA.Name, types.StrategicMergePatchType, patchBytes, 
metav1.PatchOptions{}); err != nil {
+               klog.Errorf("patch serviceAccount (%v) failed: %v", 
utils.UniqueName(oldSA), err)
+       }
+       return nil
+}
diff --git 
a/deploy/kubernetes/operator/pkg/controller/util/kubernetes/statefulset.go 
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/statefulset.go
new file mode 100644
index 00000000..f4c90908
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/statefulset.go
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kubernetes
+
+import (
+       "context"
+       "encoding/json"
+
+       appsv1 "k8s.io/api/apps/v1"
+       apierrors "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/types"
+       "k8s.io/apimachinery/pkg/util/strategicpatch"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/klog/v2"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// SyncStatefulSet synchronizes statefulSet object and returns original and 
patched object.
+func SyncStatefulSet(kubeClient kubernetes.Interface, sts *appsv1.StatefulSet,
+       overwrite bool) (oldSts, newSts *appsv1.StatefulSet, err error) {
+       oldSts, err = kubeClient.AppsV1().StatefulSets(sts.Namespace).
+               Get(context.Background(), sts.Name, metav1.GetOptions{})
+       if err != nil {
+               if !apierrors.IsNotFound(err) {
+                       klog.Errorf("get statefulSet (%v) failed: %v", 
utils.UniqueName(sts), err)
+                       return nil, nil, err
+               }
+               var created *appsv1.StatefulSet
+               created, err = kubeClient.AppsV1().StatefulSets(sts.Namespace).
+                       Create(context.Background(), sts, 
metav1.CreateOptions{})
+               if err != nil {
+                       klog.Errorf("create statefulSet (%v) failed: %v", 
utils.UniqueName(sts), err)
+                       return nil, nil, err
+               }
+               return nil, created, nil
+       }
+       // determine if we need to force an update.
+       if !overwrite {
+               return oldSts, nil, nil
+       }
+       // if it already exists, try to update it through patch method.
+       newSts, err = PatchStatefulSet(kubeClient, oldSts, sts)
+       return oldSts, newSts, err
+}
+
+// NeedUpdateSts returns whether we need to update the statefulSet.
+func NeedUpdateSts(oldSts, newSts *appsv1.StatefulSet) ([]byte, bool, error) {
+       var oldData []byte
+       var err error
+       oldData, err = json.Marshal(oldSts)
+       if err != nil {
+               klog.Errorf("marshal oldSts (%+v) failed: %v", oldSts, err)
+               return nil, false, err
+       }
+       var newData []byte
+       newData, err = json.Marshal(newSts)
+       if err != nil {
+               klog.Errorf("marshal newSts (%+v) failed: %v", newSts, err)
+               return nil, false, err
+       }
+       var patchBytes []byte
+       patchBytes, err = strategicpatch.CreateTwoWayMergePatch(oldData, 
newData,
+               &appsv1.StatefulSet{})
+       if err != nil {
+               klog.Errorf("created merge patch for statefulSet %v failed: %v",
+                       utils.UniqueName(oldSts), err)
+               return nil, false, err
+       }
+       return patchBytes, string(patchBytes) != "{}", nil
+}
+
+// PatchStatefulSet patches the old statefulSet to new statefulSet.
+func PatchStatefulSet(kubeClient kubernetes.Interface,
+       oldSts, newSts *appsv1.StatefulSet) (*appsv1.StatefulSet, error) {
+       patchBytes, update, err := NeedUpdateSts(oldSts, newSts)
+       if err != nil {
+               return nil, err
+       }
+       if !update {
+               klog.V(4).Infof("do not need to patch statefulSet (%v)", 
utils.UniqueName(oldSts))
+               return nil, nil
+       }
+       klog.V(5).Infof("patch body (%v) to statefulSet (%v)",
+               string(patchBytes), utils.UniqueName(oldSts))
+       // build payload for patch method.
+       var patched *appsv1.StatefulSet
+       patched, err = 
kubeClient.AppsV1().StatefulSets(oldSts.Namespace).Patch(context.Background(),
+               oldSts.Name, types.StrategicMergePatchType, patchBytes,
+               metav1.PatchOptions{})
+       if err != nil {
+               klog.Errorf("patch statefulSet (%v) failed: %v", 
utils.UniqueName(oldSts), err)
+               return nil, err
+       }
+       return patched, nil
+}
diff --git 
a/deploy/kubernetes/operator/pkg/controller/util/properties/properties.go 
b/deploy/kubernetes/operator/pkg/controller/util/properties/properties.go
new file mode 100644
index 00000000..49ac5ccf
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/properties/properties.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package properties
+
+import (
+       "sort"
+       "strings"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/constants"
+)
+
+// UpdateProperties updates configuration of coordinators or shuffle servers.
+func UpdateProperties(properties string, kv map[constants.PropertyKey]string) 
string {
+       oldKv := parseProperties(properties)
+       for k, v := range kv {
+               oldKv[k] = v
+       }
+       return generateProperties(oldKv)
+}
+
+// parseProperties converts string in properties format used in Java to a map.
+func parseProperties(properties string) map[constants.PropertyKey]string {
+       result := make(map[constants.PropertyKey]string)
+       lines := strings.Split(properties, "\n")
+       for _, line := range lines {
+               kv := strings.Split(line, " ")
+               if len(kv) < 2 {
+                       continue
+               }
+               result[constants.PropertyKey(kv[0])] = kv[1]
+       }
+       return result
+}
+
+// generateProperties converts a map to string in properties format used in 
Java.
+func generateProperties(kv map[constants.PropertyKey]string) string {
+       var lines []string
+       for k, v := range kv {
+               lines = append(lines, string(k)+" "+v)
+       }
+       sort.Strings(lines)
+       return strings.Join(lines, "\n")
+}
diff --git a/deploy/kubernetes/operator/pkg/controller/util/util.go 
b/deploy/kubernetes/operator/pkg/controller/util/util.go
new file mode 100644
index 00000000..6f588681
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/util.go
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package util
+
+import (
+       "fmt"
+       "strings"
+
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/client-go/kubernetes/scheme"
+       v1core "k8s.io/client-go/kubernetes/typed/core/v1"
+       "k8s.io/client-go/tools/record"
+       "k8s.io/utils/pointer"
+
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+       controllerconstants 
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/constants"
+       
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// AddOwnerReference add OwnerReference to an object.
+func AddOwnerReference(meta *metav1.ObjectMeta, rss 
*v1alpha1.RemoteShuffleService) {
+       meta.OwnerReferences = append(meta.OwnerReferences, 
generateOwnerReference(rss)...)
+}
+
+// generateOwnerReference builds an OwnerReference for rss objects.
+func generateOwnerReference(rss *v1alpha1.RemoteShuffleService) 
[]metav1.OwnerReference {
+       return []metav1.OwnerReference{
+               {
+                       APIVersion: v1alpha1.SchemeGroupVersion.String(),
+                       Kind:       "RSS",
+                       Name:       rss.Name,
+                       UID:        rss.UID,
+                       Controller: pointer.Bool(true),
+               },
+       }
+}
+
+// GenerateMainContainer generate main container of coordinators or shuffle 
servers.
+func GenerateMainContainer(name, configDir string, podSpec 
*v1alpha1.RSSPodSpec,
+       ports []corev1.ContainerPort, env []corev1.EnvVar,
+       initVolumeMounts []corev1.VolumeMount) *corev1.Container {
+       mainContainer := &corev1.Container{
+               Name:            name,
+               Image:           podSpec.Image,
+               Command:         podSpec.Command,
+               Args:            podSpec.Args,
+               ImagePullPolicy: corev1.PullAlways,
+               Resources:       podSpec.Resources,
+               Ports:           ports,
+               Env:             env,
+               VolumeMounts: []corev1.VolumeMount{
+                       {
+                               Name:      
controllerconstants.ConfigurationVolumeName,
+                               MountPath: configDir,
+                       },
+               },
+       }
+       mainContainer.VolumeMounts = append(mainContainer.VolumeMounts, 
initVolumeMounts...)
+       addVolumeMountsOfMainContainer(mainContainer, podSpec.HostPathMounts, 
podSpec.VolumeMounts)
+       return mainContainer
+}
+
+func addVolumeMountsOfMainContainer(mainContainer *corev1.Container,
+       hostPathMounts map[string]string, volumeMounts []corev1.VolumeMount) {
+       var clearPathCMDs []string
+       mainContainer.VolumeMounts = append(mainContainer.VolumeMounts,
+               GenerateHostPathVolumeMounts(hostPathMounts)...)
+       for _, mountPath := range hostPathMounts {
+               clearPathCMDs = append(clearPathCMDs, fmt.Sprintf("rm -rf 
%v/*", strings.TrimSuffix(mountPath, "/")))
+       }
+       if len(clearPathCMDs) > 0 {
+               mainContainer.Lifecycle = &corev1.Lifecycle{
+                       PreStop: &corev1.Handler{
+                               Exec: &corev1.ExecAction{
+                                       Command: []string{"/bin/sh", "-c", 
strings.Join(clearPathCMDs, ";")},
+                               },
+                       },
+               }
+       }
+       mainContainer.VolumeMounts = append(mainContainer.VolumeMounts, 
volumeMounts...)
+}
+
+// GenerateHostPathVolumeMounts generates volume mounts for hostPaths 
configured in rss objects.
+func GenerateHostPathVolumeMounts(hostPathMounts map[string]string) 
[]corev1.VolumeMount {
+       var volumeMounts []corev1.VolumeMount
+       for hostPath, mountPath := range hostPathMounts {
+               volumeMounts = append(volumeMounts, corev1.VolumeMount{
+                       Name:      GenerateHostPathVolumeName(hostPath),
+                       MountPath: mountPath,
+               })
+       }
+       return volumeMounts
+}
+
+// GenerateHostPathVolumes generates all hostPath volumes. If logHostPath is 
not empty, we need to handle it in a
+// special way by adding the subPath to the hostPath which is the same as 
logHostPath.
+func GenerateHostPathVolumes(hostPathMounts map[string]string, logHostPath, 
subPath string) []corev1.Volume {
+       volumes := make([]corev1.Volume, 0)
+       for hostPath := range hostPathMounts {
+               if len(hostPath) == 0 {
+                       continue
+               }
+               if hostPath == logHostPath {
+                       volumes = append(volumes, 
*GenerateHostPathVolume(hostPath, subPath))
+               } else {
+                       volumes = append(volumes, 
*GenerateHostPathVolume(hostPath, ""))
+               }
+       }
+       return volumes
+}
+
+// GenerateHostPathVolume convert host path to hostPath volume.
+func GenerateHostPathVolume(hostPath, subPath string) *corev1.Volume {
+       path := hostPath
+       if len(subPath) > 0 {
+               path = fmt.Sprintf("%v/%v", strings.TrimSuffix(path, "/"), 
subPath)
+       }
+       hostPathType := corev1.HostPathDirectoryOrCreate
+       return &corev1.Volume{
+               Name: GenerateHostPathVolumeName(hostPath),
+               VolumeSource: corev1.VolumeSource{
+                       HostPath: &corev1.HostPathVolumeSource{
+                               Path: path,
+                               Type: &hostPathType,
+                       },
+               },
+       }
+}
+
+// GenerateHostPathVolumeName converts host path to volume name.
+func GenerateHostPathVolumeName(hostPath string) string {
+       hostPath = strings.TrimPrefix(hostPath, "/")
+       hostPath = strings.TrimSuffix(hostPath, "/")
+       hostPath = strings.ReplaceAll(hostPath, "/", "-")
+       return hostPath
+}
+
+// GenerateInitContainers generates init containers for coordinators and 
shuffle servers.
+func GenerateInitContainers(rssPodSpec *v1alpha1.RSSPodSpec) 
[]corev1.Container {
+       var initContainers []corev1.Container
+       if rssPodSpec.SecurityContext == nil || 
rssPodSpec.SecurityContext.FSGroup == nil {
+               return initContainers
+       }
+       if len(rssPodSpec.HostPathMounts) > 0 {
+               image := rssPodSpec.InitContainerImage
+               if len(image) == 0 {
+                       image = constants.DefaultInitContainerImage
+               }
+               commands := generateMakeDataDirCommand(rssPodSpec)
+               initContainers = append(initContainers, corev1.Container{
+                       Name:            "init-data-dir",
+                       Image:           image,
+                       ImagePullPolicy: corev1.PullIfNotPresent,
+                       Command:         []string{"sh", "-c", 
strings.Join(commands, ";")},
+                       SecurityContext: &corev1.SecurityContext{
+                               RunAsUser:  pointer.Int64(0),
+                               Privileged: pointer.Bool(true),
+                       },
+                       VolumeMounts: 
GenerateHostPathVolumeMounts(rssPodSpec.HostPathMounts),
+               })
+               if len(rssPodSpec.LogHostPath) > 0 {
+                       initContainers = append(initContainers, 
corev1.Container{
+                               Name:            "init-log-dir",
+                               Image:           image,
+                               ImagePullPolicy: corev1.PullIfNotPresent,
+                               Command: []string{
+                                       "sh", "-c",
+                                       fmt.Sprintf("mkdir -p %v && chmod -R 
777 %v", rssPodSpec.LogHostPath,
+                                               rssPodSpec.LogHostPath),
+                               },
+                               SecurityContext: &corev1.SecurityContext{
+                                       RunAsUser:  pointer.Int64(0),
+                                       Privileged: pointer.Bool(true),
+                               },
+                               VolumeMounts: []corev1.VolumeMount{
+                                       *generateLogVolumeMount(rssPodSpec),
+                               },
+                       })
+               }
+       }
+       return initContainers
+}
+
+func generateLogVolumeMount(rssPodSpec *v1alpha1.RSSPodSpec) 
*corev1.VolumeMount {
+       logHostPath := rssPodSpec.LogHostPath
+       return &corev1.VolumeMount{
+               Name:      GenerateHostPathVolumeName(logHostPath),
+               MountPath: rssPodSpec.HostPathMounts[logHostPath],
+       }
+}
+
+func generateMakeDataDirCommand(rssPodSpec *v1alpha1.RSSPodSpec) []string {
+       var commands []string
+       fsGroup := *rssPodSpec.SecurityContext.FSGroup
+       for _, mountPath := range rssPodSpec.HostPathMounts {
+               commands = append(commands, fmt.Sprintf("chown -R %v:%v %v", 
fsGroup, fsGroup, mountPath))
+       }
+       return commands
+}
+
+// CreateRecorder creates an event recorder.
+func CreateRecorder(kubeClient kubernetes.Interface, component string) 
record.EventRecorder {
+       eventBroadcaster := record.NewBroadcaster()
+       eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
+               Interface: 
kubeClient.CoreV1().Events(utils.GetCurrentNamespace()),
+       })
+       return eventBroadcaster.NewRecorder(scheme.Scheme, 
corev1.EventSource{Component: component})
+}
+
+// GetConfigMapOwner returns name of rss object as owner of configMap.
+func GetConfigMapOwner(cm *corev1.ConfigMap) string {
+       return cm.Labels[controllerconstants.OwnerLabel]
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/manager_test.go 
b/deploy/kubernetes/operator/pkg/webhook/manager_test.go
index 038e3566..114ad5d9 100644
--- a/deploy/kubernetes/operator/pkg/webhook/manager_test.go
+++ b/deploy/kubernetes/operator/pkg/webhook/manager_test.go
@@ -108,7 +108,7 @@ var _ = AfterSuite(func() {
        By("stopping admission manager")
        ctxCancel()
        By("tearing down the test environment")
-       _ = testEnv.Stop()
+       Expect(testEnv.Stop()).To(Succeed())
 })
 
 var _ = Describe("AdmissionManager", func() {


Reply via email to