This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new e6b4260a [ISSUE-48][FEATURE][FOLLOW UP] Add controller component (#214)
e6b4260a is described below
commit e6b4260aa54a9ffda30ec02fa050be1c108e8a9f
Author: jasonawang <[email protected]>
AuthorDate: Tue Sep 20 14:29:59 2022 +0800
[ISSUE-48][FEATURE][FOLLOW UP] Add controller component (#214)
### What changes were proposed in this pull request?
For issue #48
I add some codes about controller module, and I'll add more unit tests in
next PR.
### Why are the changes needed?
Support K8S
### Does this PR introduce _any_ user-facing change?
Yes, we will add the doc later
### How was this patch tested?
Manual test
---
deploy/kubernetes/operator/cmd/controller/main.go | 56 ++
deploy/kubernetes/operator/go.mod | 14 +-
deploy/kubernetes/operator/go.sum | 56 +-
.../operator/pkg/controller/config/config.go | 51 ++
.../operator/pkg/controller/constants/constants.go | 83 ++
.../pkg/controller/controller/process_rss_test.go | 99 +++
.../operator/pkg/controller/controller/rss.go | 912 +++++++++++++++++++++
.../operator/pkg/controller/controller/rss_test.go | 290 +++++++
.../controller/controller/shuffle_server_test.go | 162 ++++
.../operator/pkg/controller/controller/test.go | 34 +
.../pkg/controller/sync/coordinator/coordinator.go | 297 +++++++
.../controller/sync/shuffleserver/shuffleserver.go | 346 ++++++++
.../pkg/controller/util/kubernetes/configmap.go | 86 ++
.../pkg/controller/util/kubernetes/deployment.go | 98 +++
.../pkg/controller/util/kubernetes/service.go | 111 +++
.../controller/util/kubernetes/serviceaccount.go | 86 ++
.../pkg/controller/util/kubernetes/statefulset.go | 112 +++
.../pkg/controller/util/properties/properties.go | 58 ++
.../operator/pkg/controller/util/util.go | 231 ++++++
.../operator/pkg/webhook/manager_test.go | 2 +-
20 files changed, 3157 insertions(+), 27 deletions(-)
diff --git a/deploy/kubernetes/operator/cmd/controller/main.go
b/deploy/kubernetes/operator/cmd/controller/main.go
new file mode 100644
index 00000000..edebc133
--- /dev/null
+++ b/deploy/kubernetes/operator/cmd/controller/main.go
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "flag"
+
+ "k8s.io/klog/v2"
+ ctrl "sigs.k8s.io/controller-runtime"
+
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/controller"
+)
+
+func main() {
+ klog.InitFlags(nil)
+ cfg := &config.Config{}
+ cfg.AddFlags()
+ flag.Parse()
+
+ cfg.Complete()
+ klog.Infof("run config: %+v", cfg)
+
+ // create a manager for leader election.
+ mgr, err := ctrl.NewManager(cfg.RESTConfig, ctrl.Options{
+ LeaderElection: true,
+ LeaderElectionID: cfg.LeaderElectionID(),
+ })
+ if err != nil {
+ klog.Fatal(err)
+ }
+ // create a rss controller.
+ rc := controller.NewRSSController(cfg)
+ if err = mgr.Add(rc); err != nil {
+ klog.Fatal(err)
+ }
+ // start the rss controller.
+ if err = mgr.Start(cfg.RunCtx); err != nil {
+ klog.Fatal(err)
+ }
+}
diff --git a/deploy/kubernetes/operator/go.mod
b/deploy/kubernetes/operator/go.mod
index 6121ae4c..853703b8 100644
--- a/deploy/kubernetes/operator/go.mod
+++ b/deploy/kubernetes/operator/go.mod
@@ -3,17 +3,19 @@ module
github.com/apache/incubator-uniffle/deploy/kubernetes/operator
go 1.16
require (
+ github.com/fsnotify/fsnotify v1.5.1 // indirect
+ github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/ginkgo/v2 v2.1.4
- github.com/onsi/gomega v1.19.0
+ github.com/onsi/gomega v1.20.0
github.com/parnurzeal/gorequest v0.2.16
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gomodules.xyz/jsonpatch/v2 v2.2.0
- k8s.io/api v0.22.1
- k8s.io/apimachinery v0.22.1
- k8s.io/client-go v0.22.1
- k8s.io/code-generator v0.22.1
+ k8s.io/api v0.22.2
+ k8s.io/apimachinery v0.22.2
+ k8s.io/client-go v0.22.2
+ k8s.io/code-generator v0.22.2
k8s.io/klog/v2 v2.9.0
- k8s.io/utils v0.0.0-20210802155522-efc7438f0176
+ k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
moul.io/http2curl v1.0.0 // indirect
sigs.k8s.io/controller-runtime v0.10.0
)
diff --git a/deploy/kubernetes/operator/go.sum
b/deploy/kubernetes/operator/go.sum
index a3e5c1ac..56a39259 100644
--- a/deploy/kubernetes/operator/go.sum
+++ b/deploy/kubernetes/operator/go.sum
@@ -51,6 +51,7 @@ github.com/armon/go-metrics
v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod
h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod
h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/benbjohnson/clock v1.0.3/go.mod
h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
+github.com/benbjohnson/clock v1.1.0
h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod
h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -92,6 +93,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod
h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod
h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod
h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v1.0.0/go.mod
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153
h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod
h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod
h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible
h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk=
@@ -110,8 +112,9 @@ github.com/felixge/httpsnoop v1.0.1/go.mod
h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod
h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod
h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fsnotify/fsnotify v1.4.7/go.mod
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
-github.com/fsnotify/fsnotify v1.4.9
h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod
h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/fsnotify/fsnotify v1.5.1
h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
+github.com/fsnotify/fsnotify v1.5.1/go.mod
h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/getsentry/raven-go v0.2.0/go.mod
h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/ghodss/yaml v1.0.0/go.mod
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod
h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
@@ -184,8 +187,9 @@ github.com/google/go-cmp v0.3.1/go.mod
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
+github.com/google/go-cmp v0.5.8/go.mod
h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
github.com/google/gofuzz v1.1.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -205,6 +209,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod
h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/googleapis/gnostic v0.5.1/go.mod
h1:6U4PtQXGIEt/Z3h5MAT7FNofLnw9vXk2cUuW7uA/OeU=
github.com/googleapis/gnostic v0.5.5
h1:9fHAtK0uDfpveeqqo1hkEZJcFvYXAiCN3UutL8F9xHw=
github.com/googleapis/gnostic v0.5.5/go.mod
h1:7+EbHbldMins07ALC74bsA81Ovc97DwqyJO1AENw9kA=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1
h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod
h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.2/go.mod
h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod
h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
@@ -252,6 +257,7 @@ github.com/json-iterator/go v1.1.11
h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMW
github.com/json-iterator/go v1.1.11/go.mod
h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod
h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod
h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
+github.com/jtolds/gls v4.20.0+incompatible
h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod
h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod
h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod
h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
@@ -310,19 +316,20 @@ github.com/onsi/ginkgo
v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB
github.com/onsi/ginkgo v1.6.0/go.mod
h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod
h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.0/go.mod
h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
-github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod
h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
+github.com/onsi/ginkgo v1.16.5/go.mod
h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.1.3/go.mod
h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/ginkgo/v2 v2.1.4
h1:GNapqRSid3zijZ9H77KrgVG4/8KqiyRsxcSxe+7ApXY=
github.com/onsi/ginkgo/v2 v2.1.4/go.mod
h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod
h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.7.1/go.mod
h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod
h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
-github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU=
github.com/onsi/gomega v1.15.0/go.mod
h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/onsi/gomega v1.17.0/go.mod
h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
-github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
github.com/onsi/gomega v1.19.0/go.mod
h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
+github.com/onsi/gomega v1.20.0 h1:8W0cWlwFkflGPLltQvLRB7ZVD5HuP6ng320w2IS245Q=
+github.com/onsi/gomega v1.20.0/go.mod
h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo=
github.com/opentracing/opentracing-go v1.1.0/go.mod
h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/parnurzeal/gorequest v0.2.16
h1:T/5x+/4BT+nj+3eSknXmCTnEVGSzFzPGdpqmUVVZXHQ=
github.com/parnurzeal/gorequest v0.2.16/go.mod
h1:3Kh2QUMJoqw3icWAecsyzkpY7UzRfDhbRdTjtNwNiUE=
@@ -373,7 +380,9 @@ github.com/sirupsen/logrus v1.4.2/go.mod
h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd
github.com/sirupsen/logrus v1.6.0/go.mod
h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.7.0/go.mod
h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.8.1/go.mod
h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d
h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod
h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v1.6.4
h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod
h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod
h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/soheilhy/cmux v0.1.5/go.mod
h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
@@ -434,6 +443,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod
h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
go.uber.org/atomic v1.4.0/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod
h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
@@ -474,6 +484,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod
h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod
h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod
h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod
h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
+golang.org/x/lint v0.0.0-20210508222113-6edffad5e616
h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod
h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod
h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod
h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
@@ -483,7 +494,6 @@ golang.org/x/mod
v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod
h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3
h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod
h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
@@ -520,11 +530,11 @@ golang.org/x/net
v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod
h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod
h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod
h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
-golang.org/x/net v0.0.0-20210520170846-37e1c6afe023
h1:ADo5wSpq2gqaCGQWzk7S5vd//0iyyLeAratkEoG5dLE=
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20220225172249-27dd8689420f
h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
+golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4
h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA=
+golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -588,15 +598,15 @@ golang.org/x/sys
v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2
h1:c8PlLMqBbOHoqtjteWm5/kbe6rNY2pbRfbIMVnepueo=
+golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8
h1:OH54vjqzRWmbJ62fjuhxy7AxFFgoHN0/DPc/UrL8cAs=
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220422013727-9388b58f7150
h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc=
+golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod
h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
-golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d
h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -606,7 +616,6 @@ golang.org/x/text
v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
@@ -656,7 +665,6 @@ golang.org/x/tools
v0.0.0-20200505023115-26f46d2f7ef8/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod
h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
-golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA=
golang.org/x/tools v0.1.2/go.mod
h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.10 h1:QjFRCZxdOhBJ/UNgnBZLbNV13DlbnK0quyivTnXJM20=
golang.org/x/tools v0.1.10/go.mod
h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
@@ -731,8 +739,9 @@ google.golang.org/protobuf
v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.24.0/go.mod
h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod
h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
-google.golang.org/protobuf v1.26.0
h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.28.0
h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
+google.golang.org/protobuf v1.28.0/go.mod
h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod
h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -761,8 +770,9 @@ gopkg.in/yaml.v2 v2.4.0
h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
-gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
@@ -771,17 +781,21 @@ honnef.co/go/tools
v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod
h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod
h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
-k8s.io/api v0.22.1 h1:ISu3tD/jRhYfSW8jI/Q1e+lRxkR7w9UwQEZ7FgslrwY=
k8s.io/api v0.22.1/go.mod h1:bh13rkTp3F1XEaLGykbyRD2QaTTzPm0e/BMd8ptFONY=
+k8s.io/api v0.22.2 h1:M8ZzAD0V6725Fjg53fKeTJxGsJvRbk4TEm/fexHMtfw=
+k8s.io/api v0.22.2/go.mod h1:y3ydYpLJAaDI+BbSe2xmGcqxiWHmWjkEeIbiwHvnPR8=
k8s.io/apiextensions-apiserver v0.22.1
h1:YSJYzlFNFSfUle+yeEXX0lSQyLEoxoPJySRupepb0gE=
k8s.io/apiextensions-apiserver v0.22.1/go.mod
h1:HeGmorjtRmRLE+Q8dJu6AYRoZccvCMsghwS8XTUYb2c=
-k8s.io/apimachinery v0.22.1 h1:DTARnyzmdHMz7bFWFDDm22AM4pLWTQECMpRTFu2d2OM=
k8s.io/apimachinery v0.22.1/go.mod
h1:O3oNtNadZdeOMxHFVxOreoznohCpy0z6mocxbZr7oJ0=
+k8s.io/apimachinery v0.22.2 h1:ejz6y/zNma8clPVfNDLnPbleBo6MpoFy/HBiBqCouVk=
+k8s.io/apimachinery v0.22.2/go.mod
h1:O3oNtNadZdeOMxHFVxOreoznohCpy0z6mocxbZr7oJ0=
k8s.io/apiserver v0.22.1/go.mod h1:2mcM6dzSt+XndzVQJX21Gx0/Klo7Aen7i0Ai6tIa400=
-k8s.io/client-go v0.22.1 h1:jW0ZSHi8wW260FvcXHkIa0NLxFBQszTlhiAVsU5mopw=
k8s.io/client-go v0.22.1/go.mod h1:BquC5A4UOo4qVDUtoc04/+Nxp1MeHcVc1HJm1KmG8kk=
-k8s.io/code-generator v0.22.1 h1:zAcKpn+xe9Iyc4qtZlfg4tD0f+SO2h5+e/s4pZPOVhs=
+k8s.io/client-go v0.22.2 h1:DaSQgs02aCC1QcwUdkKZWOeaVsQjYvWv8ZazcZ6JcHc=
+k8s.io/client-go v0.22.2/go.mod h1:sAlhrkVDf50ZHx6z4K0S40wISNTarf1r800F+RlCF6U=
k8s.io/code-generator v0.22.1/go.mod
h1:eV77Y09IopzeXOJzndrDyCI88UBok2h6WxAlBwpxa+o=
+k8s.io/code-generator v0.22.2 h1:+bUv9lpTnAWABtPkvO4x0kfz7j/kDEchVt0P/wXU3jQ=
+k8s.io/code-generator v0.22.2/go.mod
h1:eV77Y09IopzeXOJzndrDyCI88UBok2h6WxAlBwpxa+o=
k8s.io/component-base v0.22.1 h1:SFqIXsEN3v3Kkr1bS6rstrs1wd45StJqbtgbQ4nRQdo=
k8s.io/component-base v0.22.1/go.mod
h1:0D+Bl8rrnsPN9v0dyYvkqFfBeAd4u7n77ze+p8CMiPo=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod
h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
@@ -794,8 +808,10 @@ k8s.io/klog/v2 v2.9.0/go.mod
h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec=
k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e
h1:KLHHjkdQFomZy8+06csTWZ0m1343QqxZhR2LJ1OxCYM=
k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e/go.mod
h1:vHXdDvt9+2spS2Rx9ql3I8tycm3H9FDfdUoIuKCefvw=
k8s.io/utils v0.0.0-20210707171843-4b05e18ac7d9/go.mod
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
-k8s.io/utils v0.0.0-20210802155522-efc7438f0176
h1:Mx0aa+SUAcNRQbs5jUzV8lkDlGFU8laZsY9jrcVX5SY=
k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
+k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
+k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
h1:HNSDgDCrr/6Ly3WEGKZftiE7IY19Vz2GdbOCyI4qqhc=
+k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
moul.io/http2curl v1.0.0 h1:6XwpyZOYsgZJrU8exnG87ncVkU1FVCcTRpwzOkTDUi8=
moul.io/http2curl v1.0.0/go.mod h1:f6cULg+e4Md/oW1cYmwW4IWQOVl2lGbmCNGOHvzX2kE=
rsc.io/binaryregexp v0.2.0/go.mod
h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
diff --git a/deploy/kubernetes/operator/pkg/controller/config/config.go
b/deploy/kubernetes/operator/pkg/controller/config/config.go
new file mode 100644
index 00000000..73f19f5a
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/config/config.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "flag"
+
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+const (
+ flagWorkers = "workers"
+)
+
+// Config contains all configurations.
+type Config struct {
+ Workers int
+ utils.GenericConfig
+}
+
+// LeaderElectionID returns leader election ID.
+func (c *Config) LeaderElectionID() string {
+ return "rss-controller-" + constants.LeaderIDSuffix
+}
+
+// AddFlags adds all configurations to the global flags.
+func (c *Config) AddFlags() {
+ flag.IntVar(&c.Workers, flagWorkers, 1, "Concurrency of the rss
controller.")
+ c.GenericConfig.AddFlags()
+}
+
+// Complete is called before rss-controller runs.
+func (c *Config) Complete() {
+ c.GenericConfig.Complete()
+}
diff --git a/deploy/kubernetes/operator/pkg/controller/constants/constants.go
b/deploy/kubernetes/operator/pkg/controller/constants/constants.go
new file mode 100644
index 00000000..7c74d8bd
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/constants/constants.go
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package constants
+
+const (
+ // ContainerShuffleServerRPCPort indicates rpc port used in shuffle
server containers.
+ ContainerShuffleServerRPCPort int32 = 19997
+ // ContainerShuffleServerHTTPPort indicates http port used in shuffle
server containers.
+ ContainerShuffleServerHTTPPort int32 = 19996
+ // ContainerCoordinatorRPCPort indicates rpc port used in coordinator
containers.
+ ContainerCoordinatorRPCPort int32 = 19997
+ // ContainerCoordinatorHTTPPort indicates http port used in coordinator
containers.
+ ContainerCoordinatorHTTPPort int32 = 19996
+
+ // ShuffleServerRPCPortEnv indicates environment name of rpc port used
by shuffle servers.
+ ShuffleServerRPCPortEnv = "SERVER_RPC_PORT"
+ // ShuffleServerHTTPPortEnv indicates environment name of http port
used by shuffle servers.
+ ShuffleServerHTTPPortEnv = "SERVER_HTTP_PORT"
+ // CoordinatorRPCPortEnv indicates environment name of rpc port used by
coordinators.
+ CoordinatorRPCPortEnv = "COORDINATOR_RPC_PORT"
+ // CoordinatorHTTPPortEnv indicates environment name of http port used
by coordinators.
+ CoordinatorHTTPPortEnv = "COORDINATOR_HTTP_PORT"
+ // RSSCoordinatorQuorumEnv indicates environment name of rss
coordinator quorum used by shuffle servers.
+ RSSCoordinatorQuorumEnv = "RSS_COORDINATOR_QUORUM"
+ // XmxSizeEnv indicates environment name of xmx size used by
coordinators or shuffle servers.
+ XmxSizeEnv = "XMX_SIZE"
+ // ServiceNameEnv indicates environment name of service name used by
coordinators or shuffle servers.
+ ServiceNameEnv = "SERVICE_NAME"
+ // NodeNameEnv indicates environment name of physical node name used by
coordinators or shuffle servers.
+ NodeNameEnv = "NODE_NAME"
+ // RssIPEnv indicates environment name of shuffle servers' ip addresses.
+ RssIPEnv = "RSS_IP"
+
+ // CoordinatorServiceName defines environment variable value of
"SERVICE_NAME" used by coordinators.
+ CoordinatorServiceName = "coordinator"
+ // ShuffleServerServiceName defines environment variable value of
"SERVICE_NAME" used by shuffle servers.
+ ShuffleServerServiceName = "server"
+
+ // ExcludeNodesFile indicates volume mounting name of exclude nodes file
+ ExcludeNodesFile = "exclude-nodes-file"
+
+ // UpdateStatusError means reason of updating status of rss error
+ UpdateStatusError = "UpdateStatusError"
+
+ // OwnerLabel is the label of configMap's owner.
+ OwnerLabel = "uniffle.apache.org/owner-label"
+
+ // ConfigurationVolumeName is the name of configMap volume records
configuration of coordinators or shuffle servers.
+ ConfigurationVolumeName = "configuration"
+)
+
+// PropertyKey defines property key in configuration of coordinators or
shuffle servers.
+type PropertyKey string
+
+const (
+ // RPCServerPort represent rss port property in configuration of
coordinators or shuffle servers.
+ RPCServerPort PropertyKey = "rss.rpc.server.port"
+ // JettyHTTPPort represent http port property in configuration of
coordinators or shuffle servers.
+ JettyHTTPPort PropertyKey = "rss.jetty.http.port"
+
+ // CoordinatorQuorum represent coordinator quorum property in
configuration of shuffle servers.
+ CoordinatorQuorum PropertyKey = "rss.coordinator.quorum"
+ // StorageBasePath represent storage base path property in
configuration of shuffle servers.
+ StorageBasePath PropertyKey = "rss.storage.basePath"
+
+ // CoordinatorExcludeNodesPath represent exclude nodes path property in
configuration of coordinators.
+ CoordinatorExcludeNodesPath PropertyKey =
"rss.coordinator.exclude.nodes.file.path"
+)
diff --git
a/deploy/kubernetes/operator/pkg/controller/controller/process_rss_test.go
b/deploy/kubernetes/operator/pkg/controller/controller/process_rss_test.go
new file mode 100644
index 00000000..8eba2abe
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/controller/process_rss_test.go
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+ "context"
+ "reflect"
+ "testing"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ kubefake "k8s.io/client-go/kubernetes/fake"
+
+ unifflev1alpha1
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned/fake"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+func buildEmptyPhaseRssObj() *unifflev1alpha1.RemoteShuffleService {
+ return &unifflev1alpha1.RemoteShuffleService{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testRssName,
+ Namespace: testNamespace,
+ ResourceVersion: "test",
+ },
+ Spec: unifflev1alpha1.RemoteShuffleServiceSpec{
+ Coordinator: &unifflev1alpha1.CoordinatorConfig{
+ ExcludeNodesFilePath: "/exclude_nodes",
+ },
+ },
+ Status: unifflev1alpha1.RemoteShuffleServiceStatus{},
+ }
+}
+
+func TestProcessEmptyPhaseRss(t *testing.T) {
+ rss := buildEmptyPhaseRssObj()
+
+ rssClient := fake.NewSimpleClientset(rss)
+ kubeClient := kubefake.NewSimpleClientset()
+
+ rc := newRSSController(&config.Config{
+ GenericConfig: utils.GenericConfig{
+ KubeClient: kubeClient,
+ RSSClient: rssClient,
+ },
+ })
+
+ for _, tt := range []struct {
+ name string
+ expectedRssStatus unifflev1alpha1.RemoteShuffleServiceStatus
+ expectedNeedRetry bool
+ expectedError error
+ }{
+ {
+ name: "process rss object which has just been created,
and whose status phase is empty",
+ expectedRssStatus:
unifflev1alpha1.RemoteShuffleServiceStatus{
+ Phase: unifflev1alpha1.RSSPending,
+ },
+ expectedNeedRetry: false,
+ },
+ } {
+ needRetry, err := rc.processNormal(rss)
+ if err != nil {
+ t.Errorf("process rss object failed: %v", err)
+ return
+ }
+ if needRetry != tt.expectedNeedRetry {
+ t.Errorf("unexpected result indicates whether to
retrys: %v, expected: %v",
+ needRetry, tt.expectedNeedRetry)
+ return
+ }
+ updatedRss, getErr :=
rssClient.UniffleV1alpha1().RemoteShuffleServices(rss.Namespace).
+ Get(context.TODO(), rss.Name, metav1.GetOptions{})
+ if getErr != nil {
+ t.Errorf("get updated rss object failed: %v", err)
+ return
+ }
+ if !reflect.DeepEqual(updatedRss.Status, tt.expectedRssStatus) {
+ t.Errorf("unexpected status of updated rss object: %+v,
expected: %+v",
+ updatedRss.Status, tt.expectedRssStatus)
+ return
+ }
+ }
+}
diff --git a/deploy/kubernetes/operator/pkg/controller/controller/rss.go
b/deploy/kubernetes/operator/pkg/controller/controller/rss.go
new file mode 100644
index 00000000..2e21e04b
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/controller/rss.go
@@ -0,0 +1,912 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ appslister "k8s.io/client-go/listers/apps/v1"
+ corelisters "k8s.io/client-go/listers/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/tools/record"
+ "k8s.io/client-go/util/retry"
+ "k8s.io/client-go/util/workqueue"
+ "k8s.io/klog/v2"
+ "k8s.io/utils/pointer"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+
+ unifflev1alpha1
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
+ controllerconstants
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/constants"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/sync/coordinator"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/util"
+ kubeutil
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/util/kubernetes"
+ propertiestutil
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/util/properties"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/informers/externalversions"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/listers/uniffle/v1alpha1"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+var _ RSSController = &rssController{}
+
+const (
+ controllerName = "rss-controller"
+ appNameIndexer = "AppNameIndexer"
+)
+
+// RSSController is responsible for synchronizing rss objects.
+type RSSController interface {
+ manager.Runnable
+}
+
+// NewRSSController creates a RSSController.
+func NewRSSController(cfg *config.Config) RSSController {
+ return newRSSController(cfg)
+}
+
+// newRSSController creates a rssController.
+func newRSSController(cfg *config.Config) *rssController {
+ rc := &rssController{
+ workers: cfg.Workers,
+ kubeClient: cfg.KubeClient,
+ rssClient: cfg.RSSClient,
+ rssInformerFactory:
externalversions.NewSharedInformerFactory(cfg.RSSClient, 0),
+ shuffleServerInformerFactory:
utils.BuildShuffleServerInformerFactory(cfg.KubeClient),
+ coordinatorInformerFactory:
utils.BuildCoordinatorInformerFactory(cfg.KubeClient),
+ rssQueue:
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
+ "rss-queue"),
+ shuffleServerQueue:
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
+ "pod-queue"),
+ eventRecorder: util.CreateRecorder(cfg.KubeClient,
"rss-controller"),
+ }
+
+ rssInformer :=
rc.rssInformerFactory.Uniffle().V1alpha1().RemoteShuffleServices().Informer()
+ rssInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: rc.addRSS,
+ UpdateFunc: rc.updateRSS,
+ })
+ rc.rssInformer = rssInformer
+ rc.rssLister =
rc.rssInformerFactory.Uniffle().V1alpha1().RemoteShuffleServices().Lister()
+
+ rc.stsInformer =
rc.shuffleServerInformerFactory.Apps().V1().StatefulSets().Informer()
+ rc.stsLister =
rc.shuffleServerInformerFactory.Apps().V1().StatefulSets().Lister()
+
+ rc.podInformer =
rc.shuffleServerInformerFactory.Core().V1().Pods().Informer()
+ rc.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ DeleteFunc: rc.deleteShuffleServer,
+ })
+ if err := rc.podInformer.AddIndexers(cache.Indexers{appNameIndexer:
getAppName}); err != nil {
+ klog.Fatalf("add app name indexer failed: %v", err)
+ }
+ rc.podIndexer = rc.podInformer.GetIndexer()
+ rc.podLister =
rc.shuffleServerInformerFactory.Core().V1().Pods().Lister()
+
+ rc.cmInformer =
rc.coordinatorInformerFactory.Core().V1().ConfigMaps().Informer()
+ rc.cmLister =
rc.coordinatorInformerFactory.Core().V1().ConfigMaps().Lister()
+
+ return rc
+}
+
+// rssController implements the RSSController interface.
+type rssController struct {
+ workers int
+ kubeClient kubernetes.Interface
+ rssClient versioned.Interface
+ rssInformerFactory externalversions.SharedInformerFactory
+ rssInformer cache.SharedIndexInformer
+ rssLister v1alpha1.RemoteShuffleServiceLister
+ shuffleServerInformerFactory informers.SharedInformerFactory
+ stsInformer cache.SharedIndexInformer
+ stsLister appslister.StatefulSetLister
+ podInformer cache.SharedIndexInformer
+ podIndexer cache.Indexer
+ podLister corelisters.PodLister
+ coordinatorInformerFactory informers.SharedInformerFactory
+ cmInformer cache.SharedIndexInformer
+ cmLister corelisters.ConfigMapLister
+ rssQueue workqueue.RateLimitingInterface
+ shuffleServerQueue workqueue.RateLimitingInterface
+ eventRecorder record.EventRecorder
+}
+
+// Start starts the RSSController.
+func (r *rssController) Start(ctx context.Context) error {
+ klog.V(2).Infof("%v is starting", controllerName)
+ r.rssInformerFactory.Start(ctx.Done())
+ r.shuffleServerInformerFactory.Start(ctx.Done())
+ r.coordinatorInformerFactory.Start(ctx.Done())
+ if !cache.WaitForCacheSync(ctx.Done(), r.rssInformer.HasSynced,
r.stsInformer.HasSynced,
+ r.podInformer.HasSynced, r.cmInformer.HasSynced) {
+ return fmt.Errorf("wait for cache synced failed")
+ }
+ klog.V(2).Infof("%v started", controllerName)
+ for i := 0; i < r.workers; i++ {
+ go wait.Until(r.runRssWorker, time.Second, ctx.Done())
+ }
+ for i := 0; i < r.workers; i++ {
+ go wait.Until(r.runShuffleServerWorker, time.Second, ctx.Done())
+ }
+ <-ctx.Done()
+ return nil
+}
+
+// runRssWorker runs a thread that dequeues rss objects, handles them, and
marks them done.
+func (r *rssController) runRssWorker() {
+ for r.processNextWorkRss() {
+ }
+}
+
+// processNextWorkRss deals with one rss key off the rssQueue, it returns
false when it's time to quit.
+func (r *rssController) processNextWorkRss() bool {
+ rssKey, quit := r.rssQueue.Get()
+ if quit {
+ return false
+ }
+ defer r.rssQueue.Done(rssKey)
+
+ rssNamespace, rssName, err :=
cache.SplitMetaNamespaceKey(rssKey.(string))
+ if err != nil {
+ klog.Errorf("parsed rss key (%v) failed: %v", rssKey, err)
+ return true
+ }
+
+ var retryChecking bool
+ retryChecking, err = r.processRss(rssNamespace, rssName)
+ if err != nil {
+ klog.Errorf("processed rss %v failed : %v", rssKey, err)
+ r.rssQueue.AddRateLimited(rssKey)
+ } else if retryChecking {
+ klog.V(4).Infof("we will retry checking rss (%v) for upgrading
or terminating", rssKey)
+ r.rssQueue.AddAfter(rssKey, time.Second*3)
+ } else {
+ r.rssQueue.Forget(rssKey)
+ }
+ return true
+}
+
+// runShuffleServerWorker runs a thread that dequeues shuffle server pods,
handles them, and marks them done.
+func (r *rssController) runShuffleServerWorker() {
+ for r.processNextWorkShuffleServer() {
+ }
+}
+
+// processNextWorkShuffleServer deals with one shuffle server pod key off the
shuffleServerQueue,
+// it returns false when it's time to quit.
+func (r *rssController) processNextWorkShuffleServer() bool {
+ key, quit := r.shuffleServerQueue.Get()
+ if quit {
+ return false
+ }
+ defer r.shuffleServerQueue.Done(key)
+
+ err := r.processShuffleServer(key.(string))
+ if err == nil {
+ r.shuffleServerQueue.Forget(key)
+ return true
+ }
+
+ klog.Errorf("processed shuffle server %v failed : %v", key, err)
+ r.shuffleServerQueue.AddRateLimited(key)
+ return true
+}
+
+// processShuffleServer process current shuffle server pod key.
+func (r *rssController) processShuffleServer(key string) error {
+ klog.V(4).Infof("processing shuffle server (%v)", key)
+ namespace, rssName, currentKey := parsePodCacheKey(key)
+ rss, err := r.rssLister.RemoteShuffleServices(namespace).Get(rssName)
+ if err != nil {
+ klog.Errorf("get rss by key (%v) failed: %v", key, err)
+ if !apierrors.IsNotFound(err) {
+ return err
+ }
+ return nil
+ }
+
+ return r.updateTargetAndDeletedKeys(rss, currentKey)
+}
+
+// updateTargetAndDeletedKeys updates target and deleted keys in status of rss
objects.
+func (r *rssController) updateTargetAndDeletedKeys(rss
*unifflev1alpha1.RemoteShuffleService,
+ shuffleServerKey string) error {
+ deletedKeys := sets.NewString(rss.Status.DeletedKeys...)
+ if deletedKeys.Has(shuffleServerKey) {
+ klog.V(3).Infof("shuffle server (%v) has already been deleted
for rss (%v)",
+ shuffleServerKey, utils.UniqueName(rss))
+ return nil
+ }
+
+ deletedKeys.Insert(shuffleServerKey)
+ oldTargetKeys := sets.NewString(rss.Status.TargetKeys...)
+ newTargetKeys := oldTargetKeys.Difference(deletedKeys)
+
+ cm, err := r.getExcludeNodesCM(rss)
+ if err != nil {
+ klog.Errorf("get exclude nodes in configMap for rss (%v)
failed: %v",
+ utils.UniqueName(rss), err)
+ return err
+ }
+
+ // calculate new exclude nodes.
+ excludeNodesFileKey := utils.GetExcludeNodesConfigMapKey(rss)
+ oldExcludeNodes := sets.NewString(strings.Split(
+ cm.Data[excludeNodesFileKey], "\n")...)
+ newExcludeNodes := utils.ConvertShuffleServerKeysToNodes(newTargetKeys)
+ if !newExcludeNodes.Equal(oldExcludeNodes) {
+ cmCopy := cm.DeepCopy()
+ cmCopy.Data[excludeNodesFileKey] = strings.Join(
+ utils.GetSortedList(newExcludeNodes), "\n")
+ if _, err = r.kubeClient.CoreV1().ConfigMaps(cmCopy.Namespace).
+ Update(context.Background(), cmCopy,
metav1.UpdateOptions{}); err != nil {
+ klog.Errorf("update exclude nodes in configMap (%v) for
rss (%v) failed: %v",
+ cmCopy.Name, utils.UniqueName(rss), err)
+ return err
+ }
+ }
+
+ if err = r.updateDeletedKeys(rss, shuffleServerKey); err != nil {
+ klog.Errorf("add (%v) to deleted keys of rss (%v) failed: %v",
+ shuffleServerKey, utils.UniqueName(rss), err)
+ return err
+ }
+ return nil
+}
+
+func (r *rssController) clearExcludeNodes(rss
*unifflev1alpha1.RemoteShuffleService) error {
+ cm, err := r.getExcludeNodesCM(rss)
+ if err != nil {
+ klog.Errorf("get exclude nodes in configMap for rss (%v)
failed: %v",
+ utils.UniqueName(rss), err)
+ return err
+ }
+ cmCopy := cm.DeepCopy()
+ cmCopy.Data[utils.GetExcludeNodesConfigMapKey(rss)] = ""
+ if _, err = r.kubeClient.CoreV1().ConfigMaps(cmCopy.Namespace).
+ Update(context.Background(), cmCopy, metav1.UpdateOptions{});
err != nil {
+ klog.Errorf("update exclude nodes in configMap (%v) for rss
(%v) failed: %v",
+ cmCopy.Name, utils.UniqueName(rss), err)
+ return err
+ }
+ return nil
+}
+
+// getExcludeNodesCM returns configMap records exclude nodes.
+func (r *rssController) getExcludeNodesCM(rss
*unifflev1alpha1.RemoteShuffleService) (
+ *corev1.ConfigMap, error) {
+ namespace := rss.Namespace
+ name := utils.GenerateCoordinatorName(rss)
+ cm, err := r.cmLister.ConfigMaps(namespace).Get(name)
+ if err != nil {
+ klog.Errorf("get configMap (%v) for rss failed: %v", name,
utils.UniqueName(rss), err)
+ return nil, err
+ }
+ return cm, nil
+}
+
+// updateDeletedKeys updates deleted keys in status of rss.
+func (r *rssController) updateDeletedKeys(rss
*unifflev1alpha1.RemoteShuffleService,
+ key string) error {
+ if rss.Status.Phase != unifflev1alpha1.RSSUpgrading && rss.Status.Phase
!= unifflev1alpha1.RSSTerminating {
+ return nil
+ }
+ rssStatus := rss.Status.DeepCopy()
+ rssStatus.DeletedKeys =
utils.GetSortedList(sets.NewString(rssStatus.DeletedKeys...).Insert(key))
+ return r.updateRssStatus(rss, rssStatus)
+}
+
+// processRss process current rss by its namespace and name, and returns if we
need to try again to check upgrading.
+func (r *rssController) processRss(namespace, name string) (bool, error) {
+ klog.V(4).Infof("processing rss (%v/%v)", namespace, name)
+ startTime := time.Now()
+ defer func() {
+ klog.V(4).Infof("finished processing rss (%v/%v) cost %v",
+ namespace, name, time.Since(startTime))
+ }()
+
+ rss, err := r.rssLister.RemoteShuffleServices(namespace).Get(name)
+ if err != nil {
+ if apierrors.IsNotFound(err) {
+ klog.V(5).Infof("ignored deleted rss (%v/%v)",
namespace, name)
+ return false, nil
+ }
+ klog.Errorf("get rss (%v/%v) failed: %v", namespace, name, err)
+ return false, err
+ }
+
+ if rss.DeletionTimestamp != nil {
+ return r.processDeleting(rss)
+ }
+ return r.processNormal(rss)
+}
+
+// processDeleting process the deleting rss.
+func (r *rssController) processDeleting(rss
*unifflev1alpha1.RemoteShuffleService) (bool, error) {
+ klog.V(4).Infof("process rss (%v) to be deleted in %v phase",
+ utils.UniqueName(rss), rss.Status.Phase)
+ if rss.Status.Phase == unifflev1alpha1.RSSRunning || rss.Status.Phase
== unifflev1alpha1.RSSPending {
+ return false, r.updateRssStatus(rss,
&unifflev1alpha1.RemoteShuffleServiceStatus{
+ Phase: unifflev1alpha1.RSSTerminating,
+ })
+ } else if rss.Status.Phase == unifflev1alpha1.RSSTerminating {
+ if ok, err := r.canDeleteRss(rss); err == nil && ok {
+ return false, r.removeFinalizer(rss)
+ }
+ return r.scaleDownShuffleServer(rss)
+ }
+ return false, nil
+}
+
+func (r *rssController) canDeleteRss(rss
*unifflev1alpha1.RemoteShuffleService) (bool, error) {
+ objs, err := r.podIndexer.ByIndex(appNameIndexer,
utils.GenerateShuffleServerName(rss))
+ if err != nil {
+ klog.Errorf("get objects by indexer (%v) failed: %v",
appNameIndexer, err)
+ return false, err
+ }
+ lastPods := 0
+ for _, obj := range objs {
+ pod, ok := obj.(*corev1.Pod)
+ if ok && pod.DeletionTimestamp == nil {
+ lastPods++
+ }
+ }
+ return lastPods == 0, nil
+}
+
+func (r *rssController) scaleDownShuffleServer(rss
*unifflev1alpha1.RemoteShuffleService) (
+ bool, error) {
+ namespace := rss.Namespace
+ stsName := utils.GenerateShuffleServerName(rss)
+ sts, err := r.stsLister.StatefulSets(namespace).Get(stsName)
+ if err != nil {
+ if apierrors.IsNotFound(err) {
+ klog.V(5).Infof("ignored deleted statefulSet (%v/%v)",
namespace, stsName)
+ return false, nil
+ }
+ klog.Errorf("get statefulSet (%v) for rss (%v) failed: %v",
+ stsName, utils.UniqueName(rss), err)
+ return false, err
+ }
+ if *sts.Spec.Replicas == 0 {
+ return true, nil
+ }
+ stsCopy := sts.DeepCopy()
+ stsCopy.Spec.Replicas = pointer.Int32(0)
+ _, err = kubeutil.PatchStatefulSet(r.kubeClient, sts, stsCopy)
+ return true, err
+}
+
+// processNormal process the normal rss, and determines whether we need to
check again.
+func (r *rssController) processNormal(rss
*unifflev1alpha1.RemoteShuffleService) (bool, error) {
+ klog.V(4).Infof("process rss (%v) in %v phase", utils.UniqueName(rss),
rss.Status.Phase)
+ switch rss.Status.Phase {
+ case unifflev1alpha1.RSSPending:
+ return false, r.processPendingRSS(rss)
+ case unifflev1alpha1.RSSRunning:
+ return false, r.processRunningRSS(rss)
+ case unifflev1alpha1.RSSUpgrading:
+ return r.processUpgradingRSS(rss)
+ case unifflev1alpha1.RSSFailed:
+ klog.Errorf("failed to process rss (%v): %v",
utils.UniqueName(rss), rss.Status.Reason)
+ default:
+ return false, r.updateRssStatus(rss,
+ &unifflev1alpha1.RemoteShuffleServiceStatus{Phase:
unifflev1alpha1.RSSPending})
+ }
+ return false, nil
+}
+
+// processPendingRSS processes the pending rss by synchronizing generated
objects of coordinators and shuffle servers.
+func (r *rssController) processPendingRSS(rss
*unifflev1alpha1.RemoteShuffleService) error {
+ if err := r.syncObjects(rss); err != nil {
+ klog.Errorf("sync objects for rss (%v) failed: %v",
utils.UniqueName(rss), err)
+ return err
+ }
+ return r.updateRssStatus(rss,
+ &unifflev1alpha1.RemoteShuffleServiceStatus{Phase:
unifflev1alpha1.RSSRunning})
+}
+
+// processRunningRSS processes the running rss.
+func (r *rssController) processRunningRSS(rss
*unifflev1alpha1.RemoteShuffleService) error {
+ if err := r.syncObjects(rss); err != nil {
+ klog.Errorf("sync objects for rss (%v) failed: %v",
utils.UniqueName(rss), err)
+ return err
+ }
+ if *rss.Spec.ShuffleServer.Sync {
+ return r.prepareForUpgrading(rss)
+ }
+ return nil
+}
+
+// syncObjects synchronizes objects related to coordinators and shuffle
servers.
+func (r *rssController) syncObjects(rss *unifflev1alpha1.RemoteShuffleService)
error {
+ if err := r.syncConfigMap(rss); err != nil {
+ klog.Errorf("sync configMap for rss (%v) failed: %v",
utils.UniqueName(rss), err)
+ return err
+ }
+ if err := r.syncCoordinator(rss); err != nil {
+ klog.Errorf("sync coordinators for rss (%v) failed: %v",
utils.UniqueName(rss), err)
+ return err
+ }
+ if err := r.syncShuffleServer(rss); err != nil {
+ klog.Errorf("sync shuffle servers for rss (%v) failed: %v",
utils.UniqueName(rss), err)
+ return err
+ }
+ return nil
+}
+
+func (r *rssController) syncConfigMap(rss
*unifflev1alpha1.RemoteShuffleService) error {
+ cm, err := r.kubeClient.CoreV1().ConfigMaps(rss.Namespace).
+ Get(context.Background(), rss.Spec.ConfigMapName,
metav1.GetOptions{})
+ if err != nil {
+ klog.Errorf("get configMap of rss (%v) failed: %v",
utils.UniqueName(rss), err)
+ return err
+ }
+ if owner := util.GetConfigMapOwner(cm); len(owner) > 0 && owner !=
rss.Name {
+ return fmt.Errorf("conflict configMap name of rss (%v <> %v)",
owner, rss.Name)
+ }
+ if _, ok := cm.Data[constants.CoordinatorConfigKey]; !ok {
+ return fmt.Errorf("%v not exist",
constants.CoordinatorConfigKey)
+ }
+ if _, ok := cm.Data[constants.ShuffleServerConfigKey]; !ok {
+ return fmt.Errorf("%v not exist",
constants.ShuffleServerConfigKey)
+ }
+ if _, ok := cm.Data[constants.Log4jPropertiesKey]; !ok {
+ return fmt.Errorf("%v not exist", constants.Log4jPropertiesKey)
+ }
+
+ cm.Data[constants.CoordinatorConfigKey] =
propertiestutil.UpdateProperties(
+ cm.Data[constants.CoordinatorConfigKey],
coordinator.GenerateProperties(rss))
+ cm.Data[constants.ShuffleServerConfigKey] =
propertiestutil.UpdateProperties(
+ cm.Data[constants.ShuffleServerConfigKey],
shuffleserver.GenerateProperties(rss))
+ if cm.Labels == nil {
+ cm.Labels = make(map[string]string)
+ }
+ cm.Labels[controllerconstants.OwnerLabel] = rss.Name
+ if _, err = r.kubeClient.CoreV1().ConfigMaps(cm.Namespace).
+ Update(context.Background(), cm, metav1.UpdateOptions{}); err
!= nil {
+ klog.Errorf("update configMap (%v) of rss (%v) failed: %v",
rss.Spec.ConfigMapName,
+ utils.UniqueName(rss), err)
+ }
+ return err
+}
+
+// processUpgradingRSS processes the upgrading rss, and determines whether we
need to check again.
+func (r *rssController) processUpgradingRSS(rss
*unifflev1alpha1.RemoteShuffleService) (bool, error) {
+ switch rss.Spec.ShuffleServer.UpgradeStrategy.Type {
+ case unifflev1alpha1.FullUpgrade:
+ return r.processFullUpgrade(rss)
+ case unifflev1alpha1.PartitionUpgrade:
+ return r.processPartitionUpgrade(rss)
+ case unifflev1alpha1.SpecificUpgrade:
+ return r.processSpecificUpgrade(rss)
+ case unifflev1alpha1.FullRestart:
+ return r.processFullRestart(rss)
+ }
+ return false, nil
+}
+
+// processFullUpgrade process the rss needs full upgrade, and determines
whether we need to check again.
+func (r *rssController) processFullUpgrade(rss
*unifflev1alpha1.RemoteShuffleService) (bool, error) {
+ return r.processRollingUpgrade(rss, 0)
+}
+
+// processPartitionUpgrade process the rss needs partition upgrade, and
determines whether we need to check again.
+func (r *rssController) processPartitionUpgrade(
+ rss *unifflev1alpha1.RemoteShuffleService) (bool, error) {
+ return r.processRollingUpgrade(rss,
*rss.Spec.ShuffleServer.UpgradeStrategy.Partition)
+}
+
+// // processSpecificUpgrade process the rss needs specific upgrade, and
determines whether we need to check again.
+func (r *rssController) processSpecificUpgrade(rss
*unifflev1alpha1.RemoteShuffleService) (
+ bool, error) {
+ if err := r.serZeroPartition(rss, 0); err != nil {
+ klog.Errorf("update partition of statefulSet of rss (%v)
failed: %v",
+ utils.UniqueName(rss), err)
+ return false, err
+ }
+
+ targetNames := getNamesNeedToBeUpgraded(rss)
+ return r.handleTargetPods(targetNames, rss)
+}
+
+// handleTargetPods handles all target pods need to be upgraded or restarted,
+// and determines whether we need to check again.
+func (r *rssController) handleTargetPods(targetNames []string, rss
*unifflev1alpha1.RemoteShuffleService) (
+ bool, error) {
+ if len(targetNames) == 0 {
+ if err := r.clearExcludeNodes(rss); err != nil {
+ klog.Errorf("clear exclude nodes for finished upgrade
of rss (%v) failed: %v",
+ utils.UniqueName(rss))
+ return true, err
+ }
+ return false, r.updateRssStatus(rss,
+ &unifflev1alpha1.RemoteShuffleServiceStatus{Phase:
unifflev1alpha1.RSSRunning})
+ }
+ r.deleteTargetPods(targetNames, rss)
+ return true, nil
+}
+
+// deleteTargetPods tries to delete all target pods for upgrading or
restarting.
+func (r *rssController) deleteTargetPods(targetNames []string, rss
*unifflev1alpha1.RemoteShuffleService) {
+ wg := sync.WaitGroup{}
+ wg.Add(len(targetNames))
+ for i := range targetNames {
+ go func(podName string) {
+ defer wg.Done()
+ klog.V(5).Infof("try to delete shuffler server %v/%v",
rss.Namespace, podName)
+ if err :=
r.kubeClient.CoreV1().Pods(rss.Namespace).Delete(context.Background(),
+ podName, metav1.DeleteOptions{}); err != nil {
+ klog.V(5).Infof("deleted shuffler server %v/%v
failed: %v",
+ rss.Namespace, podName, err)
+ }
+ }(targetNames[i])
+ }
+ wg.Wait()
+}
+
+// // processSpecificUpgrade process the rss needs full restart, and
determines whether it has finished restarting.
+func (r *rssController) processFullRestart(rss
*unifflev1alpha1.RemoteShuffleService) (
+ bool, error) {
+ targetNames, err := getNamesNeedToBeRestarted(r.podLister, rss)
+ if err != nil {
+ klog.Errorf("get all pod names of rss (%v) to be restarted
failed: %v", utils.UniqueName(rss), err)
+ return false, err
+ }
+ return r.handleTargetPods(targetNames, rss)
+}
+
+// syncCoordinator synchronizes objects related to coordinators.
+func (r *rssController) syncCoordinator(rss
*unifflev1alpha1.RemoteShuffleService) error {
+ if !*rss.Spec.Coordinator.Sync {
+ return nil
+ }
+ serviceAccount, configMap, services, deployments :=
coordinator.GenerateCoordinators(rss)
+ if err := kubeutil.SyncServiceAccount(r.kubeClient, serviceAccount);
err != nil {
+ klog.Errorf("sync serviceAccount (%v) for rss (%v) failed: %v",
+ utils.UniqueName(serviceAccount),
utils.UniqueName(rss), err)
+ return err
+ }
+ if err := kubeutil.SyncConfigMap(r.kubeClient, configMap); err != nil {
+ klog.Errorf("sync configMap (%v) for rss (%v) failed: %v",
+ utils.UniqueName(serviceAccount),
utils.UniqueName(rss), err)
+ return err
+ }
+ if err := kubeutil.SyncServices(r.kubeClient, services); err != nil {
+ klog.Errorf("sync services for rss (%v) failed: %v",
utils.UniqueName(rss), err)
+ return err
+ }
+ if err := kubeutil.SyncDeployments(r.kubeClient, deployments); err !=
nil {
+ klog.Errorf("sync deployments for rss (%v) failed: %v",
utils.UniqueName(rss), err)
+ return err
+ }
+ return nil
+}
+
+// syncShuffleServer synchronizes objects related to shuffle servers.
+func (r *rssController) syncShuffleServer(rss
*unifflev1alpha1.RemoteShuffleService) error {
+ if rss.Status.Phase == unifflev1alpha1.RSSRunning &&
!*rss.Spec.ShuffleServer.Sync {
+ return nil
+ }
+ serviceAccount, services, statefulSet :=
shuffleserver.GenerateShuffleServers(rss)
+ if err := kubeutil.SyncServiceAccount(r.kubeClient, serviceAccount);
err != nil {
+ klog.Errorf("sync SA (%v) for rss (%v) failed: %v",
+ utils.UniqueName(serviceAccount),
utils.UniqueName(rss), err)
+ return err
+ }
+ if err := kubeutil.SyncServices(r.kubeClient, services); err != nil {
+ klog.Errorf("sync SVCs for rss (%v) failed: %v",
utils.UniqueName(rss), err)
+ return err
+ }
+ if _, _, err := kubeutil.SyncStatefulSet(r.kubeClient, statefulSet,
true); err != nil {
+ klog.Errorf("sync StatefulSet for rss (%v) failed: %v",
utils.UniqueName(rss), err)
+ return err
+ }
+ return nil
+}
+
+// prepareForUpgrading prepares for the upgrade by disable syncing shuffle
servers.
+func (r *rssController) prepareForUpgrading(rss
*unifflev1alpha1.RemoteShuffleService) error {
+ updated, err := r.disableSyncingShuffleServer(rss)
+ if err != nil {
+ klog.Errorf("disabled syncing shuffle server failed: %v", err)
+ return err
+ }
+ err = r.updateRssStatus(updated,
+ &unifflev1alpha1.RemoteShuffleServiceStatus{Phase:
unifflev1alpha1.RSSUpgrading})
+ if err != nil {
+ r.eventRecorder.Event(rss, corev1.EventTypeWarning,
controllerconstants.UpdateStatusError,
+ fmt.Sprintf("set %v phase failed: %v",
unifflev1alpha1.RSSUpgrading, err))
+ }
+ return err
+}
+
+// disableSyncingShuffleServer disables syncing of shuffle servers before
upgrading.
+func (r *rssController) disableSyncingShuffleServer(rss
*unifflev1alpha1.RemoteShuffleService) (
+ *unifflev1alpha1.RemoteShuffleService, error) {
+ rssCopy := rss.DeepCopy()
+ rssCopy.Spec.ShuffleServer.Sync = pointer.Bool(false)
+ updated, err :=
r.rssClient.UniffleV1alpha1().RemoteShuffleServices(rssCopy.Namespace).
+ Update(context.Background(), rssCopy, metav1.UpdateOptions{})
+ if err != nil {
+ klog.Errorf("updated .spec.shuffleServer.sync of rss (%v)
failed: %v",
+ utils.UniqueName(rssCopy), err)
+ return nil, err
+ }
+ return updated, nil
+}
+
+// updateRssStatus updates status of rss.
+func (r *rssController) updateRssStatus(rss
*unifflev1alpha1.RemoteShuffleService,
+ status *unifflev1alpha1.RemoteShuffleServiceStatus) error {
+ rssCopy := rss.DeepCopy()
+ rssCopy.Status = *status
+ _, err :=
r.rssClient.UniffleV1alpha1().RemoteShuffleServices(rssCopy.Namespace).
+ UpdateStatus(context.Background(), rssCopy,
metav1.UpdateOptions{})
+ if err == nil {
+ return nil
+ }
+ klog.Errorf("updated status of rss (%v) failed: %v",
utils.UniqueName(rssCopy), err)
+ if !apierrors.IsConflict(err) {
+ return err
+ }
+ return retry.RetryOnConflict(retry.DefaultRetry, func() error {
+ latestRss, getErr :=
r.rssClient.UniffleV1alpha1().RemoteShuffleServices(rss.Namespace).
+ Get(context.Background(), rss.Name, metav1.GetOptions{})
+ if getErr != nil {
+ klog.Errorf("get rss (%v) failed: %v",
utils.UniqueName(rss), getErr)
+ return getErr
+ }
+ latestRss.Status = *status
+ _, updateErr :=
r.rssClient.UniffleV1alpha1().RemoteShuffleServices(rss.Namespace).
+ UpdateStatus(context.Background(), latestRss,
metav1.UpdateOptions{})
+ if updateErr != nil {
+ klog.Errorf("retry updating status of rss (%v) failed:
%v",
+ utils.UniqueName(latestRss), updateErr)
+ }
+ return updateErr
+ })
+}
+
+// addRSS handles the added rss.
+func (r *rssController) addRSS(obj interface{}) {
+ r.enqueueRss(obj)
+}
+
+// updateRSS handles the updated rss.
+func (r *rssController) updateRSS(_, newObj interface{}) {
+ r.enqueueRss(newObj)
+}
+
+// deleteShuffleServer handles the deleted shuffler server pod.
+func (r *rssController) deleteShuffleServer(obj interface{}) {
+ r.enqueueShuffleServer(obj)
+}
+
+// enqueueRss enqueues a rss.
+func (r *rssController) enqueueRss(obj interface{}) {
+ rss, ok := obj.(*unifflev1alpha1.RemoteShuffleService)
+ if !ok {
+ klog.Errorf("object is not a rss: %+v", obj)
+ return
+ }
+
+ key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(rss)
+ if err != nil {
+ klog.Errorf("can not get key of rss: %v", utils.UniqueName(rss))
+ return
+ }
+ r.rssQueue.Add(key)
+}
+
+// enqueueShuffleServer enqueues a shuffle server pod.
+func (r *rssController) enqueueShuffleServer(obj interface{}) {
+ var pod *corev1.Pod
+ switch t := obj.(type) {
+ case *corev1.Pod:
+ pod = t
+ case cache.DeletedFinalStateUnknown:
+ if p, ok := t.Obj.(*corev1.Pod); ok {
+ pod = p
+ }
+ }
+ if pod == nil {
+ klog.Errorf("object is not a Pod object: %+v", obj)
+ return
+ }
+ rssName := utils.GetRssNameByPod(pod)
+ if len(rssName) != 0 {
+ key := buildPodCacheKey(pod.Namespace, rssName,
utils.BuildShuffleServerKey(pod))
+ r.shuffleServerQueue.Add(key)
+ }
+}
+
+// removeFinalizer removes the finalizer of rss for deleting it.
+func (r *rssController) removeFinalizer(rss
*unifflev1alpha1.RemoteShuffleService) error {
+ rssCopy := rss.DeepCopy()
+ var finalizers []string
+ for _, f := range finalizers {
+ if f == constants.RSSFinalizerName {
+ continue
+ }
+ finalizers = append(finalizers, f)
+ }
+ rssCopy.Finalizers = finalizers
+ _, err :=
r.rssClient.UniffleV1alpha1().RemoteShuffleServices(rssCopy.Namespace).
+ Update(context.Background(), rssCopy, metav1.UpdateOptions{})
+ if err != nil {
+ klog.Errorf("removed finalizer from rss (%v) failed: %v",
utils.UniqueName(rssCopy), err)
+ return err
+ }
+ klog.V(3).Infof("removed finalizer from rss (%v)",
utils.UniqueName(rssCopy))
+ return nil
+}
+
+// processRollingUpgrade processes full or partition upgrade in rolling mode.
+func (r *rssController) processRollingUpgrade(rss
*unifflev1alpha1.RemoteShuffleService,
+ minPartition int32) (bool, error) {
+ finished, oldSts, err := r.checkUpgradeFinished(rss, minPartition)
+ if err != nil {
+ klog.Errorf("checked whether the current upgrade of rss (%v)
has been finished failed: %v",
+ utils.UniqueName(rss), err)
+ return false, err
+ }
+ if finished {
+ if err = r.clearExcludeNodes(rss); err != nil {
+ klog.Errorf("clear exclude nodes for finished upgrade
of rss (%v) failed: %v",
+ utils.UniqueName(rss))
+ return true, err
+ }
+ return false, r.updateRssStatus(rss,
+ &unifflev1alpha1.RemoteShuffleServiceStatus{Phase:
unifflev1alpha1.RSSRunning})
+ }
+
+ return true, r.decreasePartition(oldSts, minPartition)
+}
+
+// checkUpgradeFinished checks whether we have finished the current upgrade.
+func (r *rssController) checkUpgradeFinished(rss
*unifflev1alpha1.RemoteShuffleService,
+ minPartition int32) (bool, *appsv1.StatefulSet, error) {
+ stsName := shuffleserver.GenerateName(rss)
+ oldSts, err := r.stsLister.StatefulSets(rss.Namespace).Get(stsName)
+ if err != nil {
+ klog.Errorf("get StatefulSet (%v/%v) failed: %v",
rss.Namespace, stsName, err)
+ return false, nil, err
+ }
+ if oldSts.Status.CurrentRevision == oldSts.Status.UpdateRevision {
+ minPartition = 0
+ }
+ klog.V(4).Infof("current updatedReplicas: %v, replicas: %v,
minPartition: %v",
+ oldSts.Status.UpdatedReplicas, *oldSts.Spec.Replicas,
minPartition)
+ if oldSts.Status.UpdatedReplicas >= *oldSts.Spec.Replicas-minPartition
&&
+ *oldSts.Spec.Replicas == oldSts.Status.ReadyReplicas {
+ klog.V(4).Infof("do not need to update partition of statefulSet
(%v)",
+ utils.UniqueName(oldSts))
+ return true, nil, nil
+ }
+ return false, oldSts, nil
+}
+
+// decreasePartition decrease the partition value of statefulSet used by
shuffle server if we do net finish the upgrade.
+func (r *rssController) decreasePartition(oldSts *appsv1.StatefulSet,
minPartition int32) error {
+ newSts := oldSts.DeepCopy()
+ targetPartition := *newSts.Spec.Replicas -
oldSts.Status.UpdatedReplicas - 1
+ if targetPartition < minPartition {
+ targetPartition = minPartition
+ }
+ klog.V(4).Infof("set partition of statefulSet (%v) to %v >= %v",
+ utils.UniqueName(newSts), targetPartition, minPartition)
+ newSts.Spec.UpdateStrategy.RollingUpdate.Partition = &targetPartition
+ _, err := kubeutil.PatchStatefulSet(r.kubeClient, oldSts, newSts)
+ return err
+}
+
+// serZeroPartition sets zero value for partition of statefulSet used by
shuffle servers for specify upgrade,
+// because we need to
+func (r *rssController) serZeroPartition(rss
*unifflev1alpha1.RemoteShuffleService,
+ partition int32) error {
+ stsName := shuffleserver.GenerateName(rss)
+ oldSts, err := r.stsLister.StatefulSets(rss.Namespace).Get(stsName)
+ if err != nil {
+ klog.Errorf("get StatefulSet (%v/%v) failed: %v",
rss.Namespace, stsName, err)
+ return err
+ }
+ newSts := oldSts.DeepCopy()
+ newSts.Spec.UpdateStrategy.RollingUpdate.Partition = &partition
+ _, err = kubeutil.PatchStatefulSet(r.kubeClient, oldSts, newSts)
+ return err
+}
+
+// buildPodCacheKey builds a key for shuffle server pod queue.
+func buildPodCacheKey(namespace, rssName, shuffleServerKey string) string {
+ return namespace + "|" + rssName + "|" + shuffleServerKey
+}
+
+// parsePodCacheKey parses the key in shuffle server pod queue.
+func parsePodCacheKey(key string) (namespace, rssName, shuffleServerKey
string) {
+ values := strings.Split(key, "|")
+ if len(values) == 3 {
+ namespace = values[0]
+ rssName = values[1]
+ shuffleServerKey = values[2]
+ }
+ return
+}
+
+// getNamesNeedToBeDeleted returns the names of shuffle server pods need to be
deleted currently.
+func getNamesNeedToBeDeleted(targetNames []string, rss
*unifflev1alpha1.RemoteShuffleService) []string {
+ deletedKeys := rss.Status.DeletedKeys
+ var waitingNames []string
+ for _, target := range targetNames {
+ found := false
+ for _, key := range deletedKeys {
+ _, podName, _ := utils.ParseShuffleServerKey(key)
+ if podName == target {
+ found = true
+ break
+ }
+ }
+ if !found {
+ waitingNames = append(waitingNames, target)
+ }
+ }
+ return waitingNames
+}
+
+// getNamesNeedToBeUpgraded returns the names of shuffle server pods need to
be upgraded currently.
+func getNamesNeedToBeUpgraded(rss *unifflev1alpha1.RemoteShuffleService)
[]string {
+ specificNames :=
sets.NewString(rss.Spec.ShuffleServer.UpgradeStrategy.SpecificNames...).List()
+ return getNamesNeedToBeDeleted(specificNames, rss)
+}
+
+// getNamesNeedToBeRestarted returns the names of shuffle server pods need to
be restarted currently.
+func getNamesNeedToBeRestarted(podLister corelisters.PodLister, rss
*unifflev1alpha1.RemoteShuffleService) (
+ []string, error) {
+ pods, err :=
podLister.Pods(rss.Namespace).List(labels.SelectorFromSet(utils.GenerateShuffleServerLabels(rss)))
+ if err != nil {
+ klog.Errorf("get pods of rss (%v) failed: %v",
utils.UniqueName(rss), err)
+ return nil, err
+ }
+ targetNames := sets.NewString()
+ for i := 0; i < len(pods); i++ {
+ targetNames.Insert(pods[i].Name)
+ }
+ return getNamesNeedToBeDeleted(targetNames.List(), rss), nil
+}
+
+func getAppName(obj interface{}) ([]string, error) {
+ pod, ok := obj.(*corev1.Pod)
+ if !ok {
+ return nil, fmt.Errorf("not a pod")
+ }
+ return []string{pod.Labels["app"]}, nil
+}
diff --git a/deploy/kubernetes/operator/pkg/controller/controller/rss_test.go
b/deploy/kubernetes/operator/pkg/controller/controller/rss_test.go
new file mode 100644
index 00000000..a72c7c10
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/controller/rss_test.go
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+ "context"
+ "path/filepath"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/utils/pointer"
+ "sigs.k8s.io/controller-runtime/pkg/envtest"
+ logf "sigs.k8s.io/controller-runtime/pkg/log"
+ "sigs.k8s.io/controller-runtime/pkg/log/zap"
+
+ unifflev1alpha1
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/sync/coordinator"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+var (
+ testEnv *envtest.Environment
+ testKubeClient kubernetes.Interface
+ testRssClient versioned.Interface
+ testCM *corev1.ConfigMap
+ testRSS *unifflev1alpha1.RemoteShuffleService
+ stopCtx context.Context
+ ctxCancel context.CancelFunc
+)
+
+func TestRssController(t *testing.T) {
+ RegisterFailHandler(Fail)
+ suiteCfg, reporterCfg := GinkgoConfiguration()
+ reporterCfg.VeryVerbose = true
+ reporterCfg.FullTrace = true
+ RunSpecs(t, "rss controller suite", suiteCfg, reporterCfg)
+}
+
+var _ = BeforeSuite(
+ func() {
+ logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter),
zap.UseDevMode(true)))
+ By("bootstrapping test environment")
+ testEnv = &envtest.Environment{
+ CRDDirectoryPaths: []string{filepath.Join("../../..",
"config", "crd", "bases")},
+ }
+ restConfig, err := testEnv.Start()
+ Expect(err).To(BeNil())
+ Expect(restConfig).ToNot(BeNil())
+
+ testCM, testRSS = initTestRss()
+
+ testKubeClient, err = kubernetes.NewForConfig(restConfig)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(testKubeClient).ToNot(BeNil())
+
+ testRssClient, err = versioned.NewForConfig(restConfig)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(testRssClient).ToNot(BeNil())
+
+ err = unifflev1alpha1.AddToScheme(scheme.Scheme)
+ Expect(err).NotTo(HaveOccurred())
+
+ // +kubebuilder:scaffold:scheme
+
+ cfg := &config.Config{
+ Workers: 1,
+ GenericConfig: utils.GenericConfig{
+ RESTConfig: restConfig,
+ KubeClient: testKubeClient,
+ RSSClient: testRssClient,
+ },
+ }
+ rc := newRSSController(cfg)
+ stopCtx, ctxCancel = context.WithCancel(context.TODO())
+ go func() {
+ err = rc.Start(stopCtx)
+ Expect(err).ToNot(HaveOccurred())
+ }()
+ },
+)
+
+var _ = AfterSuite(func() {
+ By("stopping rss controller")
+ ctxCancel()
+ By("tearing down the test environment")
+ Expect(testEnv.Stop()).To(Succeed())
+})
+
+// At present, it is not possible to simulate the real operation of Workload
through EnvTest.
+// TODO: more detailed tests will be added in the future.
+var _ = Describe("RssController", func() {
+ Context("Handle rss objects", func() {
+ It("Create a rss object", func() {
+ By("Create test configMap")
+ _, err :=
testKubeClient.CoreV1().ConfigMaps(testCM.Namespace).
+ Create(context.TODO(), testCM,
metav1.CreateOptions{})
+ Expect(err).ToNot(HaveOccurred())
+
+ By("Create test rss object")
+ _, err =
testRssClient.UniffleV1alpha1().RemoteShuffleServices(corev1.NamespaceDefault).
+ Create(context.TODO(), testRSS,
metav1.CreateOptions{})
+ Expect(err).ToNot(HaveOccurred())
+
+ By("Wait rss object running")
+ err = wait.Poll(time.Second, time.Second*5, func()
(bool, error) {
+ curRss, getErr :=
testRssClient.UniffleV1alpha1().RemoteShuffleServices(testNamespace).
+ Get(context.TODO(), testRssName,
metav1.GetOptions{})
+ if getErr != nil {
+ return false, getErr
+ }
+
+ if curRss.Status.Phase !=
unifflev1alpha1.RSSRunning || *curRss.Spec.ShuffleServer.Sync {
+ return false, nil
+ }
+ return true, nil
+ })
+ Expect(err).ToNot(HaveOccurred())
+
+ By("Check coordinator 0")
+ coordinatorName0 :=
coordinator.GenerateNameByIndex(testRSS, 0)
+ _, err =
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+ Get(context.TODO(), coordinatorName0,
metav1.GetOptions{})
+ Expect(err).ToNot(HaveOccurred())
+
+ By("Check coordinator 1")
+ coordinatorName1 :=
coordinator.GenerateNameByIndex(testRSS, 1)
+ _, err =
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+ Get(context.TODO(), coordinatorName1,
metav1.GetOptions{})
+ Expect(err).ToNot(HaveOccurred())
+
+ By("Check shuffle server")
+ shuffleServerName := shuffleserver.GenerateName(testRSS)
+ _, err =
testKubeClient.AppsV1().StatefulSets(corev1.NamespaceDefault).
+ Get(context.TODO(), shuffleServerName,
metav1.GetOptions{})
+ Expect(err).ToNot(HaveOccurred())
+ })
+ It("Update the rss object", func() {
+ By("Get current rss object")
+ curRSS, err :=
testRssClient.UniffleV1alpha1().RemoteShuffleServices(corev1.NamespaceDefault).
+ Get(context.TODO(), testRssName,
metav1.GetOptions{})
+ Expect(err).ToNot(HaveOccurred())
+ Expect(curRSS).ToNot(BeNil())
+ Expect(*curRSS.Spec.ShuffleServer.Sync, false)
+
+ shuffleServerName := shuffleserver.GenerateName(testRSS)
+ var sts *appsv1.StatefulSet
+ sts, err =
testKubeClient.AppsV1().StatefulSets(corev1.NamespaceDefault).
+ Get(context.TODO(), shuffleServerName,
metav1.GetOptions{})
+ Expect(err).ToNot(HaveOccurred())
+ Expect(sts).ToNot(BeNil())
+ Expect(*sts.Spec.Replicas).To(Equal(int32(1)))
+
+ coordinatorName0 :=
coordinator.GenerateNameByIndex(testRSS, 0)
+ var coordinator0 *appsv1.Deployment
+ coordinator0, err =
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+ Get(context.TODO(), coordinatorName0,
metav1.GetOptions{})
+ Expect(err).ToNot(HaveOccurred())
+ Expect(coordinator0).ToNot(BeNil())
+
Expect(coordinator0.Spec.Template.Spec.Containers[0].Image).To(Equal(testCoordinatorImage1))
+
+ coordinatorName1 :=
coordinator.GenerateNameByIndex(testRSS, 0)
+ var coordinator1 *appsv1.Deployment
+ coordinator1, err =
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+ Get(context.TODO(), coordinatorName1,
metav1.GetOptions{})
+ Expect(err).ToNot(HaveOccurred())
+ Expect(coordinator1).ToNot(BeNil())
+
Expect(coordinator1.Spec.Template.Spec.Containers[0].Image).To(Equal(testCoordinatorImage1))
+
+ By("Update test rss object")
+ curRSS.Spec.Coordinator.Image = testCoordinatorImage2
+ curRSS.Spec.ShuffleServer.Sync = pointer.Bool(true)
+ curRSS.Spec.ShuffleServer.Replicas = pointer.Int32(3)
+ _, err =
testRssClient.UniffleV1alpha1().RemoteShuffleServices(corev1.NamespaceDefault).
+ Update(context.TODO(), curRSS,
metav1.UpdateOptions{})
+ Expect(err).ToNot(HaveOccurred())
+
+ By("Wait rss object upgrading")
+ err = wait.Poll(time.Second, time.Second*5, func()
(bool, error) {
+ curRss, getErr :=
testRssClient.UniffleV1alpha1().RemoteShuffleServices(testNamespace).
+ Get(context.TODO(), testRssName,
metav1.GetOptions{})
+ if getErr != nil {
+ return false, getErr
+ }
+ if curRss.Status.Phase !=
unifflev1alpha1.RSSUpgrading {
+ return false, nil
+ }
+ return true, nil
+ })
+ Expect(err).ToNot(HaveOccurred())
+
+ By("Check coordinator 0")
+ coordinator0, err =
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+ Get(context.TODO(), coordinatorName0,
metav1.GetOptions{})
+ Expect(err).ToNot(HaveOccurred())
+ Expect(coordinator0).ToNot(BeNil())
+
Expect(coordinator0.Spec.Template.Spec.Containers[0].Image).To(Equal(testCoordinatorImage2))
+
+ By("Check coordinator 1")
+ coordinator1, err =
testKubeClient.AppsV1().Deployments(corev1.NamespaceDefault).
+ Get(context.TODO(), coordinatorName1,
metav1.GetOptions{})
+ Expect(err).ToNot(HaveOccurred())
+ Expect(coordinator1).ToNot(BeNil())
+
Expect(coordinator1.Spec.Template.Spec.Containers[0].Image).To(Equal(testCoordinatorImage2))
+
+ By("Check shuffle server")
+ sts, err =
testKubeClient.AppsV1().StatefulSets(corev1.NamespaceDefault).
+ Get(context.TODO(), shuffleServerName,
metav1.GetOptions{})
+ Expect(err).ToNot(HaveOccurred())
+ Expect(sts).ToNot(BeNil())
+ Expect(*sts.Spec.Replicas).To(Equal(int32(3)))
+ })
+ })
+})
+
+func initTestRss() (*corev1.ConfigMap, *unifflev1alpha1.RemoteShuffleService) {
+ cm := &corev1.ConfigMap{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testRssName,
+ Namespace: testNamespace,
+ },
+ Data: map[string]string{
+ constants.CoordinatorConfigKey: "",
+ constants.ShuffleServerConfigKey: "",
+ constants.Log4jPropertiesKey: "",
+ },
+ }
+ rss := &unifflev1alpha1.RemoteShuffleService{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testRssName,
+ Namespace: testNamespace,
+ },
+ Spec: unifflev1alpha1.RemoteShuffleServiceSpec{
+ ConfigMapName: testRssName,
+ Coordinator: &unifflev1alpha1.CoordinatorConfig{
+ HTTPNodePort: []int32{30001, 30011},
+ RPCNodePort: []int32{30002, 30012},
+ CommonConfig: &unifflev1alpha1.CommonConfig{
+ ConfigDir: "/app/config",
+ RSSPodSpec: &unifflev1alpha1.RSSPodSpec{
+ MainContainer:
&unifflev1alpha1.MainContainer{
+ Image:
testCoordinatorImage1,
+ },
+ },
+ },
+ },
+ ShuffleServer: &unifflev1alpha1.ShuffleServerConfig{
+ CommonConfig: &unifflev1alpha1.CommonConfig{
+ ConfigDir: "/app/config",
+ RSSPodSpec: &unifflev1alpha1.RSSPodSpec{
+ MainContainer:
&unifflev1alpha1.MainContainer{
+ Image:
"rss-shuffleserver:latest",
+ },
+ },
+ XmxSize: "10G",
+ },
+ UpgradeStrategy:
&unifflev1alpha1.ShuffleServerUpgradeStrategy{
+ Type: unifflev1alpha1.FullUpgrade,
+ },
+ },
+ },
+ }
+ return cm, rss
+}
diff --git
a/deploy/kubernetes/operator/pkg/controller/controller/shuffle_server_test.go
b/deploy/kubernetes/operator/pkg/controller/controller/shuffle_server_test.go
new file mode 100644
index 00000000..85cb5c98
--- /dev/null
+++
b/deploy/kubernetes/operator/pkg/controller/controller/shuffle_server_test.go
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+ "context"
+ "sort"
+ "testing"
+
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ kubefake "k8s.io/client-go/kubernetes/fake"
+
+ unifflev1alpha1
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned/fake"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+func buildUpgradingRssObjWithTargetKeys(shuffleServerKey string)
*unifflev1alpha1.RemoteShuffleService {
+ return &unifflev1alpha1.RemoteShuffleService{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testRssName,
+ Namespace: testNamespace,
+ ResourceVersion: "test",
+ },
+ Spec: unifflev1alpha1.RemoteShuffleServiceSpec{
+ Coordinator: &unifflev1alpha1.CoordinatorConfig{
+ ExcludeNodesFilePath: "/exclude_nodes",
+ },
+ },
+ Status: unifflev1alpha1.RemoteShuffleServiceStatus{
+ Phase: unifflev1alpha1.RSSUpgrading,
+ TargetKeys: []string{shuffleServerKey},
+ },
+ }
+}
+
+func buildTestShuffleServerPod() *corev1.Pod {
+ return &corev1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testShuffleServerPodName,
+ Namespace: testNamespace,
+ Annotations: map[string]string{
+ constants.AnnotationShuffleServerPort: "8080",
+ },
+ Labels: map[string]string{
+ appsv1.ControllerRevisionHashLabelKey:
"test-revision-1",
+ },
+ },
+ Status: corev1.PodStatus{
+ PodIP: "10.0.0.1",
+ },
+ }
+}
+
+func buildTestExcludeNodeConfigMap(rss *unifflev1alpha1.RemoteShuffleService,
+ shuffleServerKey string) *corev1.ConfigMap {
+ return &corev1.ConfigMap{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: utils.GenerateCoordinatorName(rss),
+ Namespace: testNamespace,
+ },
+ Data: map[string]string{
+ testExcludeNodeFileKey: shuffleServerKey,
+ },
+ }
+}
+
+func TestUpdateTargetAndDeletedKeys(t *testing.T) {
+ testShuffleServerPod := buildTestShuffleServerPod()
+ shuffleServerKey := utils.BuildShuffleServerKey(testShuffleServerPod)
+ rss := buildUpgradingRssObjWithTargetKeys(shuffleServerKey)
+ cm := buildTestExcludeNodeConfigMap(rss, shuffleServerKey)
+
+ rssClient := fake.NewSimpleClientset(rss)
+ kubeClient := kubefake.NewSimpleClientset(cm)
+
+ rc := newRSSController(&config.Config{
+ GenericConfig: utils.GenericConfig{
+ KubeClient: kubeClient,
+ RSSClient: rssClient,
+ },
+ })
+ if err := rc.cmInformer.GetIndexer().Add(cm); err != nil {
+ t.Fatalf("update fake cm lister failed: %v", err)
+ }
+
+ for _, tt := range []struct {
+ name string
+ expectedDeletedKeys []string
+ expectedExcludeNodes string
+ }{
+ {
+ name: "update target and deleted keys
when a shuffle server pod has been deleted",
+ expectedDeletedKeys: []string{shuffleServerKey},
+ expectedExcludeNodes: "",
+ },
+ } {
+ t.Run(tt.name, func(tc *testing.T) {
+ if err := rc.updateTargetAndDeletedKeys(rss,
shuffleServerKey); err != nil {
+ t.Errorf("update target and deleted keys
failed: %v", err)
+ return
+ }
+
+ curRss, err :=
rssClient.UniffleV1alpha1().RemoteShuffleServices(rss.Namespace).Get(context.TODO(),
+ rss.Name, metav1.GetOptions{})
+ if err != nil {
+ t.Errorf("get updated rss object failed: %v",
err)
+ return
+ }
+ if !stringSliceIsEqual(curRss.Status.DeletedKeys,
tt.expectedDeletedKeys) {
+ t.Errorf("unexpected deleted keys: %+v,
expected: %+v", curRss.Status.DeletedKeys,
+ tt.expectedDeletedKeys)
+ return
+ }
+
+ var curCM *corev1.ConfigMap
+ curCM, err =
kubeClient.CoreV1().ConfigMaps(cm.Namespace).Get(context.TODO(), cm.Name,
metav1.GetOptions{})
+ if err != nil {
+ t.Errorf("get updated rss object failed: %v",
err)
+ return
+ }
+ if curCM.Data[testExcludeNodeFileKey] !=
tt.expectedExcludeNodes {
+ t.Errorf("unexpected deleted keys: %v,
expected: %v", curCM.Data[testExcludeNodeFileKey],
+ tt.expectedExcludeNodes)
+ return
+ }
+ })
+ }
+}
+
+func stringSliceIsEqual(current []string, target []string) bool {
+ if len(current) != len(target) {
+ return false
+ }
+ sort.Strings(current)
+ sort.Strings(target)
+ for i := range current {
+ if current[i] != target[i] {
+ return false
+ }
+ }
+ return true
+}
diff --git a/deploy/kubernetes/operator/pkg/controller/controller/test.go
b/deploy/kubernetes/operator/pkg/controller/controller/test.go
new file mode 100644
index 00000000..3058176e
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/controller/test.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+ corev1 "k8s.io/api/core/v1"
+
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+)
+
+const (
+ testRssName = "test"
+ testNamespace = corev1.NamespaceDefault
+ testCoordinatorImage1 = "rss-coordinator:test1"
+ testCoordinatorImage2 = "rss-coordinator:test2"
+
+ testShuffleServerPodName = constants.RSSShuffleServer + "-" +
testRssName + "-0"
+ testExcludeNodeFileKey = "exclude_nodes"
+)
diff --git
a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
new file mode 100644
index 00000000..d31aba65
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package coordinator
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/intstr"
+ "k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/utils/pointer"
+
+ unifflev1alpha1
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+ controllerconstants
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/constants"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/util"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+var defaultENVs sets.String
+
+func init() {
+ defaultENVs = sets.NewString()
+ defaultENVs.Insert(controllerconstants.CoordinatorRPCPortEnv,
+ controllerconstants.CoordinatorHTTPPortEnv,
+ controllerconstants.XmxSizeEnv,
+ controllerconstants.ServiceNameEnv)
+}
+
+// GenerateCoordinators generates objects related to coordinators
+func GenerateCoordinators(rss *unifflev1alpha1.RemoteShuffleService) (
+ *corev1.ServiceAccount, *corev1.ConfigMap, []*corev1.Service,
[]*appsv1.Deployment) {
+ sa := GenerateSA(rss)
+ cm := GenerateCM(rss)
+ count := *rss.Spec.Coordinator.Count
+ services := make([]*corev1.Service, count)
+ deployments := make([]*appsv1.Deployment, count)
+ for i := 0; i < int(count); i++ {
+ svc := GenerateSvc(rss, i)
+ deploy := GenerateDeploy(rss, i)
+ services[i] = svc
+ deployments[i] = deploy
+ }
+ return sa, cm, services, deployments
+}
+
+// GenerateSA generates service account of coordinator.
+func GenerateSA(rss *unifflev1alpha1.RemoteShuffleService)
*corev1.ServiceAccount {
+ sa := &corev1.ServiceAccount{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: utils.GenerateCoordinatorName(rss),
+ Namespace: rss.Namespace,
+ },
+ }
+ util.AddOwnerReference(&sa.ObjectMeta, rss)
+ return sa
+}
+
+// GenerateCM generates configMap used by coordinators.
+func GenerateCM(rss *unifflev1alpha1.RemoteShuffleService) *corev1.ConfigMap {
+ cm := &corev1.ConfigMap{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: utils.GenerateCoordinatorName(rss),
+ Namespace: rss.Namespace,
+ Labels: map[string]string{
+ constants.LabelCoordinator: "true",
+ },
+ },
+ Data: map[string]string{
+ utils.GetExcludeNodesConfigMapKey(rss): "",
+ },
+ }
+ util.AddOwnerReference(&cm.ObjectMeta, rss)
+ return cm
+}
+
+// GenerateSvc generates service used by specific coordinator.
+func GenerateSvc(rss *unifflev1alpha1.RemoteShuffleService, index int)
*corev1.Service {
+ name := GenerateNameByIndex(rss, index)
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: rss.Namespace,
+ },
+ Spec: corev1.ServiceSpec{
+ Type: corev1.ServiceTypeNodePort,
+ Selector: map[string]string{
+ "app": name,
+ },
+ Ports: []corev1.ServicePort{
+ {
+ Name: "rpc",
+ Protocol: corev1.ProtocolTCP,
+ Port:
controllerconstants.ContainerCoordinatorRPCPort,
+ TargetPort:
intstr.FromInt(int(*rss.Spec.Coordinator.RPCPort)),
+ NodePort:
rss.Spec.Coordinator.RPCNodePort[index],
+ },
+ {
+ Name: "http",
+ Protocol: corev1.ProtocolTCP,
+ Port:
controllerconstants.ContainerCoordinatorHTTPPort,
+ TargetPort:
intstr.FromInt(int(*rss.Spec.Coordinator.HTTPPort)),
+ NodePort:
rss.Spec.Coordinator.HTTPNodePort[index],
+ },
+ },
+ },
+ }
+ util.AddOwnerReference(&svc.ObjectMeta, rss)
+ return svc
+}
+
+// GenerateDeploy generates deployment of specific coordinator.
+func GenerateDeploy(rss *unifflev1alpha1.RemoteShuffleService, index int)
*appsv1.Deployment {
+ name := GenerateNameByIndex(rss, index)
+
+ podSpec := corev1.PodSpec{
+ HostNetwork: *rss.Spec.Coordinator.HostNetwork,
+ ServiceAccountName: utils.GenerateCoordinatorName(rss),
+ Tolerations: []corev1.Toleration{
+ {
+ Effect: corev1.TaintEffectNoSchedule,
+ Key: "node-role.kubernetes.io/master",
+ },
+ },
+ Volumes: []corev1.Volume{
+ {
+ Name:
controllerconstants.ConfigurationVolumeName,
+ VolumeSource: corev1.VolumeSource{
+ ConfigMap:
&corev1.ConfigMapVolumeSource{
+ LocalObjectReference:
corev1.LocalObjectReference{
+ Name:
rss.Spec.ConfigMapName,
+ },
+ DefaultMode:
pointer.Int32(0777),
+ },
+ },
+ },
+ },
+ NodeSelector: rss.Spec.Coordinator.NodeSelector,
+ }
+ if podSpec.HostNetwork {
+ podSpec.DNSPolicy = corev1.DNSClusterFirstWithHostNet
+ }
+
+ deploy := &appsv1.Deployment{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: rss.Namespace,
+ Labels: map[string]string{
+ "app": name,
+ },
+ },
+ Spec: appsv1.DeploymentSpec{
+ Selector: &metav1.LabelSelector{
+ MatchLabels: map[string]string{
+ "app": name,
+ },
+ },
+ Replicas: rss.Spec.Coordinator.Replicas,
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Labels: map[string]string{
+ "app": name,
+ },
+ },
+ Spec: podSpec,
+ },
+ },
+ }
+
+ // add init containers, the main container and other containers.
+ deploy.Spec.Template.Spec.InitContainers =
util.GenerateInitContainers(rss.Spec.Coordinator.RSSPodSpec)
+ containers := []corev1.Container{*generateMainContainer(rss)}
+ containers = append(containers,
rss.Spec.Coordinator.SidecarContainers...)
+ deploy.Spec.Template.Spec.Containers = containers
+
+ // add configMap volume to save exclude nodes.
+ configMapVolume := corev1.Volume{
+ Name: controllerconstants.ExcludeNodesFile,
+ VolumeSource: corev1.VolumeSource{
+ ConfigMap: &corev1.ConfigMapVolumeSource{
+ LocalObjectReference:
corev1.LocalObjectReference{
+ Name:
utils.GenerateCoordinatorName(rss),
+ },
+ DefaultMode: pointer.Int32(0777),
+ },
+ },
+ }
+ deploy.Spec.Template.Spec.Volumes =
append(deploy.Spec.Template.Spec.Volumes, configMapVolume)
+ // add hostPath volumes for coordinators.
+ hostPathMounts := rss.Spec.Coordinator.HostPathMounts
+ logHostPath := rss.Spec.Coordinator.LogHostPath
+ deploy.Spec.Template.Spec.Volumes =
append(deploy.Spec.Template.Spec.Volumes,
+ util.GenerateHostPathVolumes(hostPathMounts, logHostPath,
name)...)
+
+ util.AddOwnerReference(&deploy.ObjectMeta, rss)
+ return deploy
+}
+
+// GenerateNameByIndex returns workload or service name of coordinator by
index.
+func GenerateNameByIndex(rss *unifflev1alpha1.RemoteShuffleService, index int)
string {
+ return fmt.Sprintf("%v-%v-%v", constants.RSSCoordinator, rss.Name,
index)
+}
+
+// GenerateAddresses returns addresses of coordinators accessed by shuffle
servers.
+func GenerateAddresses(rss *unifflev1alpha1.RemoteShuffleService) string {
+ var names []string
+ for i := 0; i < int(*rss.Spec.Coordinator.Count); i++ {
+ current := fmt.Sprintf("%v:%v", GenerateNameByIndex(rss, i),
+ controllerconstants.ContainerShuffleServerRPCPort)
+ names = append(names, current)
+ }
+ return strings.Join(names, ",")
+}
+
+// GenerateProperties generates configuration properties of coordinators.
+func GenerateProperties(rss *unifflev1alpha1.RemoteShuffleService)
map[controllerconstants.PropertyKey]string {
+ result := make(map[controllerconstants.PropertyKey]string)
+ result[controllerconstants.RPCServerPort] = fmt.Sprintf("%v",
*rss.Spec.Coordinator.RPCPort)
+ result[controllerconstants.JettyHTTPPort] = fmt.Sprintf("%v",
*rss.Spec.Coordinator.HTTPPort)
+ result[controllerconstants.CoordinatorExcludeNodesPath] =
utils.GetExcludeNodesMountPath(rss)
+ return result
+}
+
+// generateMainContainer generates main container of coordinators.
+func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService)
*corev1.Container {
+ return util.GenerateMainContainer(constants.RSSCoordinator,
rss.Spec.Coordinator.ConfigDir,
+ rss.Spec.Coordinator.RSSPodSpec.DeepCopy(),
generateMainContainerPorts(rss),
+ generateMainContainerENV(rss), []corev1.VolumeMount{
+ {
+ Name: controllerconstants.ExcludeNodesFile,
+ MountPath: utils.GetExcludeNodesMountPath(rss),
+ },
+ })
+}
+
+// generateMainContainerPorts generates ports of main container of
coordinators.
+func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService)
[]corev1.ContainerPort {
+ ports := []corev1.ContainerPort{
+ {
+ ContainerPort:
controllerconstants.ContainerCoordinatorRPCPort,
+ Protocol: corev1.ProtocolTCP,
+ },
+ {
+ ContainerPort:
controllerconstants.ContainerCoordinatorHTTPPort,
+ Protocol: corev1.ProtocolTCP,
+ },
+ }
+ ports = append(ports, rss.Spec.Coordinator.Ports...)
+ return ports
+}
+
+// generateMainContainerENV generates environment variables of main container
of coordinators.
+func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService)
[]corev1.EnvVar {
+ env := []corev1.EnvVar{
+ {
+ Name: controllerconstants.CoordinatorRPCPortEnv,
+ Value:
strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10),
+ },
+ {
+ Name: controllerconstants.CoordinatorHTTPPortEnv,
+ Value:
strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10),
+ },
+ {
+ Name: controllerconstants.XmxSizeEnv,
+ Value: rss.Spec.Coordinator.XmxSize,
+ },
+ {
+ Name: controllerconstants.ServiceNameEnv,
+ Value: controllerconstants.CoordinatorServiceName,
+ },
+ }
+ for _, e := range rss.Spec.Coordinator.Env {
+ if !defaultENVs.Has(e.Name) {
+ env = append(env, e)
+ }
+ }
+ return env
+}
diff --git
a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
new file mode 100644
index 00000000..9a65842d
--- /dev/null
+++
b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package shuffleserver
+
+import (
+ "fmt"
+ "sort"
+ "strconv"
+ "strings"
+
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/intstr"
+ "k8s.io/utils/pointer"
+
+ unifflev1alpha1
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+ controllerconstants
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/constants"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/sync/coordinator"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/util"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// GenerateShuffleServers generates objects related to shuffle servers.
+func GenerateShuffleServers(rss *unifflev1alpha1.RemoteShuffleService) (
+ *corev1.ServiceAccount, []*corev1.Service, *appsv1.StatefulSet) {
+ sa := GenerateSA(rss)
+ var services []*corev1.Service
+ if needGenerateHeadlessSVC(rss) {
+ services = append(services, GenerateHeadlessSVC(rss))
+ }
+ if needGenerateNodePortSVC(rss) {
+ services = append(services, GenerateNodePortSVC(rss))
+ }
+ sts := GenerateSts(rss)
+ return sa, services, sts
+}
+
+// GenerateSA generates service account of shuffle servers.
+func GenerateSA(rss *unifflev1alpha1.RemoteShuffleService)
*corev1.ServiceAccount {
+ sa := &corev1.ServiceAccount{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: GenerateName(rss),
+ Namespace: rss.Namespace,
+ },
+ }
+ util.AddOwnerReference(&sa.ObjectMeta, rss)
+ return sa
+}
+
+// GenerateHeadlessSVC generates headless service used by shuffle servers.
+func GenerateHeadlessSVC(rss *unifflev1alpha1.RemoteShuffleService)
*corev1.Service {
+ name := generateHeadlessSVCName(rss)
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: rss.Namespace,
+ },
+ Spec: corev1.ServiceSpec{
+ ClusterIP: corev1.ClusterIPNone,
+ Selector: map[string]string{
+ "app": GenerateName(rss),
+ },
+ },
+ }
+ if rss.Spec.ShuffleServer.RPCPort != nil {
+ svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+ Name: "rpc",
+ Protocol: corev1.ProtocolTCP,
+ Port:
controllerconstants.ContainerShuffleServerRPCPort,
+ TargetPort:
intstr.FromInt(int(*rss.Spec.ShuffleServer.RPCPort)),
+ })
+ }
+ if rss.Spec.ShuffleServer.HTTPPort != nil {
+ svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+ Name: "http",
+ Protocol: corev1.ProtocolTCP,
+ Port:
controllerconstants.ContainerShuffleServerHTTPPort,
+ TargetPort:
intstr.FromInt(int(*rss.Spec.ShuffleServer.HTTPPort)),
+ })
+ }
+ util.AddOwnerReference(&svc.ObjectMeta, rss)
+ return svc
+}
+
+// GenerateNodePortSVC generates nodePort service used by shuffle servers.
+func GenerateNodePortSVC(rss *unifflev1alpha1.RemoteShuffleService)
*corev1.Service {
+ name := GenerateName(rss)
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: rss.Namespace,
+ },
+ Spec: corev1.ServiceSpec{
+ Type: corev1.ServiceTypeNodePort,
+ Selector: map[string]string{
+ "app": name,
+ },
+ },
+ }
+ if needNodePortForRPC(rss) {
+ svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+ Protocol: corev1.ProtocolTCP,
+ Port:
controllerconstants.ContainerShuffleServerRPCPort,
+ TargetPort:
intstr.FromInt(int(*rss.Spec.ShuffleServer.RPCPort)),
+ NodePort: *rss.Spec.ShuffleServer.RPCNodePort,
+ })
+ }
+ if needNodePortForHTTP(rss) {
+ svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+ Protocol: corev1.ProtocolTCP,
+ Port:
controllerconstants.ContainerShuffleServerHTTPPort,
+ TargetPort:
intstr.FromInt(int(*rss.Spec.ShuffleServer.HTTPPort)),
+ NodePort: *rss.Spec.ShuffleServer.HTTPNodePort,
+ })
+ }
+ util.AddOwnerReference(&svc.ObjectMeta, rss)
+ return svc
+}
+
+// getReplicas returns replicas of shuffle servers.
+func getReplicas(rss *unifflev1alpha1.RemoteShuffleService) *int32 {
+ // TODO: we will support hpa for rss object, and when we enable hpa, wo
should not return replicas in .spec.shuffleServer field.
+ return rss.Spec.ShuffleServer.Replicas
+}
+
+// GenerateSts generates statefulSet of shuffle servers.
+func GenerateSts(rss *unifflev1alpha1.RemoteShuffleService)
*appsv1.StatefulSet {
+ name := GenerateName(rss)
+ replicas := getReplicas(rss)
+
+ podSpec := corev1.PodSpec{
+ SecurityContext: rss.Spec.ShuffleServer.SecurityContext,
+ HostNetwork: *rss.Spec.ShuffleServer.HostNetwork,
+ ServiceAccountName: GenerateName(rss),
+ Tolerations: []corev1.Toleration{
+ {
+ Effect: corev1.TaintEffectNoSchedule,
+ Key: "node-role.kubernetes.io/master",
+ },
+ },
+ Volumes: []corev1.Volume{
+ {
+ Name:
controllerconstants.ConfigurationVolumeName,
+ VolumeSource: corev1.VolumeSource{
+ ConfigMap:
&corev1.ConfigMapVolumeSource{
+ LocalObjectReference:
corev1.LocalObjectReference{
+ Name:
rss.Spec.ConfigMapName,
+ },
+ DefaultMode:
pointer.Int32(0777),
+ },
+ },
+ },
+ },
+ NodeSelector: rss.Spec.ShuffleServer.NodeSelector,
+ }
+ if podSpec.HostNetwork {
+ podSpec.DNSPolicy = corev1.DNSClusterFirstWithHostNet
+ }
+
+ sts := &appsv1.StatefulSet{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: rss.Namespace,
+ Labels: utils.GenerateShuffleServerLabels(rss),
+ },
+ Spec: appsv1.StatefulSetSpec{
+ Selector: &metav1.LabelSelector{
+ MatchLabels:
utils.GenerateShuffleServerLabels(rss),
+ },
+ UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
+ Type:
appsv1.RollingUpdateStatefulSetStrategyType,
+ RollingUpdate:
&appsv1.RollingUpdateStatefulSetStrategy{
+ Partition: replicas,
+ },
+ },
+ ServiceName: generateHeadlessSVCName(rss),
+ Replicas: replicas,
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Labels:
utils.GenerateShuffleServerLabels(rss),
+ Annotations: map[string]string{
+ constants.AnnotationRssName:
rss.Name,
+ constants.AnnotationRssUID:
string(rss.UID),
+
constants.AnnotationMetricsServerPort: fmt.Sprintf("%v",
+
controllerconstants.ContainerShuffleServerHTTPPort),
+
constants.AnnotationShuffleServerPort: fmt.Sprintf("%v",
+
controllerconstants.ContainerShuffleServerRPCPort),
+ },
+ },
+ Spec: podSpec,
+ },
+ },
+ }
+
+ // add init containers, the main container and other containers.
+ sts.Spec.Template.Spec.InitContainers =
util.GenerateInitContainers(rss.Spec.ShuffleServer.RSSPodSpec)
+ containers := []corev1.Container{*generateMainContainer(rss)}
+ containers = append(containers,
rss.Spec.ShuffleServer.SidecarContainers...)
+ sts.Spec.Template.Spec.Containers = containers
+
+ // add hostPath volumes for shuffle servers.
+ hostPathMounts := rss.Spec.ShuffleServer.HostPathMounts
+ logHostPath := rss.Spec.ShuffleServer.LogHostPath
+ sts.Spec.Template.Spec.Volumes = append(sts.Spec.Template.Spec.Volumes,
+ util.GenerateHostPathVolumes(hostPathMounts, logHostPath,
name)...)
+
+ util.AddOwnerReference(&sts.ObjectMeta, rss)
+ return sts
+}
+
+// GenerateName returns workload or nodePort service name of shuffle server.
+func GenerateName(rss *unifflev1alpha1.RemoteShuffleService) string {
+ return utils.GenerateShuffleServerName(rss)
+}
+
+// GenerateProperties generates configuration properties of shuffle servers.
+func GenerateProperties(rss *unifflev1alpha1.RemoteShuffleService)
map[controllerconstants.PropertyKey]string {
+ result := make(map[controllerconstants.PropertyKey]string)
+ result[controllerconstants.RPCServerPort] = fmt.Sprintf("%v",
*rss.Spec.ShuffleServer.RPCPort)
+ result[controllerconstants.JettyHTTPPort] = fmt.Sprintf("%v",
*rss.Spec.ShuffleServer.HTTPPort)
+ result[controllerconstants.CoordinatorQuorum] =
coordinator.GenerateAddresses(rss)
+ result[controllerconstants.StorageBasePath] =
generateStorageBasePath(rss)
+ return result
+}
+
+// generateStorageBasePath generates storage base path in shuffle server's
configuration.
+func generateStorageBasePath(rss *unifflev1alpha1.RemoteShuffleService) string
{
+ var paths []string
+ for k, v := range rss.Spec.ShuffleServer.HostPathMounts {
+ if k == rss.Spec.ShuffleServer.LogHostPath {
+ continue
+ }
+ paths = append(paths, strings.TrimSuffix(v, "/")+"/rssdata")
+ }
+ sort.Strings(paths)
+ return strings.Join(paths, ",")
+}
+
+// generateHeadlessSVCName returns name of shuffle servers' headless service.
+func generateHeadlessSVCName(rss *unifflev1alpha1.RemoteShuffleService) string
{
+ return GenerateName(rss) + "-headless"
+}
+
+// generateMainContainer generates main container of shuffle servers.
+func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService)
*corev1.Container {
+ return util.GenerateMainContainer(constants.RSSShuffleServer,
rss.Spec.ShuffleServer.ConfigDir,
+ rss.Spec.ShuffleServer.RSSPodSpec.DeepCopy(),
generateMainContainerPorts(rss),
+ generateMainContainerENV(rss), nil)
+}
+
+// generateMainContainerPorts generates ports of main container of shuffle
servers.
+func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService)
[]corev1.ContainerPort {
+ ports := []corev1.ContainerPort{
+ {
+ ContainerPort:
controllerconstants.ContainerShuffleServerRPCPort,
+ Protocol: corev1.ProtocolTCP,
+ },
+ {
+ ContainerPort:
controllerconstants.ContainerShuffleServerHTTPPort,
+ Protocol: corev1.ProtocolTCP,
+ },
+ }
+ ports = append(ports, rss.Spec.ShuffleServer.Ports...)
+ return ports
+}
+
+// generateMainContainerENV generates environment variables of main container
of shuffle servers.
+func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService)
[]corev1.EnvVar {
+ return []corev1.EnvVar{
+ {
+ Name: controllerconstants.ShuffleServerRPCPortEnv,
+ Value:
strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerRPCPort), 10),
+ },
+ {
+ Name: controllerconstants.ShuffleServerHTTPPortEnv,
+ Value:
strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerHTTPPort),
10),
+ },
+ {
+ Name: controllerconstants.RSSCoordinatorQuorumEnv,
+ Value: coordinator.GenerateAddresses(rss),
+ },
+ {
+ Name: controllerconstants.XmxSizeEnv,
+ Value: rss.Spec.ShuffleServer.XmxSize,
+ },
+ {
+ Name: controllerconstants.ServiceNameEnv,
+ Value: controllerconstants.ShuffleServerServiceName,
+ },
+ {
+ Name: controllerconstants.NodeNameEnv,
+ ValueFrom: &corev1.EnvVarSource{
+ FieldRef: &corev1.ObjectFieldSelector{
+ APIVersion: "v1",
+ FieldPath: "spec.nodeName",
+ },
+ },
+ },
+ {
+ Name: controllerconstants.RssIPEnv,
+ ValueFrom: &corev1.EnvVarSource{
+ FieldRef: &corev1.ObjectFieldSelector{
+ APIVersion: "v1",
+ FieldPath: "status.podIP",
+ },
+ },
+ },
+ }
+}
+
+// needGenerateNodePortSVC returns whether we need node port service for
shuffle servers.
+func needGenerateNodePortSVC(rss *unifflev1alpha1.RemoteShuffleService) bool {
+ return needNodePortForRPC(rss) || needNodePortForHTTP(rss)
+}
+
+// needGenerateHeadlessSVC returns whether we need headless service for
shuffle servers.
+func needGenerateHeadlessSVC(rss *unifflev1alpha1.RemoteShuffleService) bool {
+ return rss.Spec.ShuffleServer.RPCPort != nil ||
rss.Spec.ShuffleServer.HTTPPort != nil
+}
+
+// needNodePortForRPC returns whether we need node port service for rpc
service of shuffle servers.
+func needNodePortForRPC(rss *unifflev1alpha1.RemoteShuffleService) bool {
+ return rss.Spec.ShuffleServer.RPCPort != nil &&
rss.Spec.ShuffleServer.RPCNodePort != nil
+}
+
+// needNodePortForRPC returns whether we need node port service for http
service of shuffle servers.
+func needNodePortForHTTP(rss *unifflev1alpha1.RemoteShuffleService) bool {
+ return rss.Spec.ShuffleServer.HTTPPort != nil &&
rss.Spec.ShuffleServer.HTTPNodePort != nil
+}
diff --git
a/deploy/kubernetes/operator/pkg/controller/util/kubernetes/configmap.go
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/configmap.go
new file mode 100644
index 00000000..66a7c973
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/configmap.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kubernetes
+
+import (
+ "context"
+ "encoding/json"
+
+ corev1 "k8s.io/api/core/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/klog/v2"
+
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// SyncConfigMap synchronizes configMap object.
+func SyncConfigMap(kubeClient kubernetes.Interface, cm *corev1.ConfigMap)
error {
+ oldCM, err := kubeClient.CoreV1().ConfigMaps(cm.Namespace).
+ Get(context.Background(), cm.Name, metav1.GetOptions{})
+ if err != nil {
+ if !apierrors.IsNotFound(err) {
+ klog.Errorf("get configMap (%v) failed: %v",
utils.UniqueName(cm), err)
+ return err
+ }
+ // try to create a new configMap.
+ if _, err = kubeClient.CoreV1().ConfigMaps(cm.Namespace).
+ Create(context.Background(), cm,
metav1.CreateOptions{}); err != nil {
+ klog.Errorf("create configMap (%v) failed: %v",
utils.UniqueName(cm), err)
+ return err
+ }
+ return nil
+ }
+ // if it already exists, try to update it through patch method.
+ return PatchConfigMap(kubeClient, oldCM, cm)
+}
+
+// PatchConfigMap patches the old configMap to new configMap.
+func PatchConfigMap(kubeClient kubernetes.Interface,
+ oldCM, newCM *corev1.ConfigMap) error {
+ var oldData []byte
+ var err error
+ oldData, err = json.Marshal(oldCM)
+ if err != nil {
+ klog.Errorf("marshal oldCM (%+v) failed: %v", oldCM, err)
+ return err
+ }
+ var newData []byte
+ newData, err = json.Marshal(newCM)
+ if err != nil {
+ klog.Errorf("marshal newCM (%+v) failed: %v", newCM, err)
+ return err
+ }
+ // build payload for patch method.
+ var patchBytes []byte
+ patchBytes, err = strategicpatch.CreateTwoWayMergePatch(oldData,
newData,
+ &corev1.ConfigMap{})
+ if err != nil {
+ klog.Errorf("created merge patch for configMap %v failed: %v",
utils.UniqueName(oldCM), err)
+ return err
+ }
+ if _, err =
kubeClient.CoreV1().ConfigMaps(oldCM.Namespace).Patch(context.Background(),
+ oldCM.Name, types.StrategicMergePatchType, patchBytes,
metav1.PatchOptions{}); err != nil {
+ klog.Errorf("patch configMap (%v) failed: %v",
utils.UniqueName(oldCM), err)
+ return err
+ }
+ return nil
+}
diff --git
a/deploy/kubernetes/operator/pkg/controller/util/kubernetes/deployment.go
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/deployment.go
new file mode 100644
index 00000000..f0bc7aae
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/deployment.go
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kubernetes
+
+import (
+ "context"
+ "encoding/json"
+
+ appsv1 "k8s.io/api/apps/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/klog/v2"
+
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// SyncDeployments synchronizes deployment objects.
+func SyncDeployments(kubeClient kubernetes.Interface, deployments
[]*appsv1.Deployment) error {
+ for i := range deployments {
+ if err := SyncDeployment(kubeClient, deployments[i]); err !=
nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// SyncDeployment synchronizes deployment object.
+func SyncDeployment(kubeClient kubernetes.Interface, deploy
*appsv1.Deployment) error {
+ oldDeploy, err := kubeClient.AppsV1().Deployments(deploy.Namespace).
+ Get(context.Background(), deploy.Name, metav1.GetOptions{})
+ if err != nil {
+ if !apierrors.IsNotFound(err) {
+ klog.Errorf("get deployment (%v) failed: %v",
utils.UniqueName(deploy), err)
+ return err
+ }
+ // try to create a new deployment.
+ if _, err = kubeClient.AppsV1().Deployments(deploy.Namespace).
+ Create(context.Background(), deploy,
metav1.CreateOptions{}); err != nil {
+ klog.Errorf("create deployment (%v) failed: %v",
utils.UniqueName(deploy), err)
+ return err
+ }
+ return nil
+ }
+ // if it already exists, try to update it through patch method.
+ return PatchDeployment(kubeClient, oldDeploy, deploy)
+}
+
+// PatchDeployment patches the old deployment to new deployment.
+func PatchDeployment(kubeClient kubernetes.Interface,
+ oldDeploy, newDeploy *appsv1.Deployment) error {
+ var oldData []byte
+ var err error
+ oldData, err = json.Marshal(oldDeploy)
+ if err != nil {
+ klog.Errorf("marshal oldDeploy (%+v) failed: %v", oldDeploy,
err)
+ return err
+ }
+ var newData []byte
+ newData, err = json.Marshal(newDeploy)
+ if err != nil {
+ klog.Errorf("marshal newDeploy (%+v) failed: %v", newDeploy,
err)
+ return err
+ }
+ // build payload for patch method.
+ var patchBytes []byte
+ patchBytes, err = strategicpatch.CreateTwoWayMergePatch(oldData,
newData,
+ &appsv1.Deployment{})
+ if err != nil {
+ klog.Errorf("created merge patch for deployment %v failed: %v",
+ utils.UniqueName(oldDeploy), err)
+ return err
+ }
+ if _, err =
kubeClient.AppsV1().Deployments(oldDeploy.Namespace).Patch(context.Background(),
+ oldDeploy.Name, types.StrategicMergePatchType, patchBytes,
+ metav1.PatchOptions{}); err != nil {
+ klog.Errorf("patch deployment (%v) failed: %v",
utils.UniqueName(oldDeploy), err)
+ return err
+ }
+ return nil
+}
diff --git
a/deploy/kubernetes/operator/pkg/controller/util/kubernetes/service.go
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/service.go
new file mode 100644
index 00000000..c3540226
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/service.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kubernetes
+
+import (
+ "context"
+ "encoding/json"
+
+ corev1 "k8s.io/api/core/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/klog/v2"
+
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// SyncServices synchronizes service objects.
+func SyncServices(kubeClient kubernetes.Interface, services []*corev1.Service)
error {
+ for i := range services {
+ if err := SyncService(kubeClient, services[i]); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// SyncService synchronizes service object.
+func SyncService(kubeClient kubernetes.Interface, svc *corev1.Service) error {
+ oldSVC, err := kubeClient.CoreV1().Services(svc.Namespace).
+ Get(context.Background(), svc.Name, metav1.GetOptions{})
+ if err != nil {
+ if !apierrors.IsNotFound(err) {
+ klog.Errorf("get service (%v) failed: %v",
utils.UniqueName(svc), err)
+ return err
+ }
+ // try to create a new service.
+ if _, err = kubeClient.CoreV1().Services(svc.Namespace).
+ Create(context.Background(), svc,
metav1.CreateOptions{}); err != nil {
+ klog.Errorf("create service (%v) failed: %v",
utils.UniqueName(svc), err)
+ return err
+ }
+ return nil
+ }
+ // if it already exists, try to update it through patch method.
+ return PatchService(kubeClient, oldSVC, svc)
+}
+
+// mergeSVC merges old and new service, and generates a service to be updated.
+// Service objects are special, because we can only update the following
fields and if we merge
+// original and new object directly, it may clear the '.spec.ClusterIP field',
which is forbidden.
+func mergeSVC(oldSVC, newSVC *corev1.Service) *corev1.Service {
+ updatedSVC := oldSVC.DeepCopy()
+ updatedSVC.Labels = newSVC.Labels
+ updatedSVC.Annotations = newSVC.Annotations
+ updatedSVC.OwnerReferences = newSVC.OwnerReferences
+ updatedSVC.Spec.Type = newSVC.Spec.Type
+ updatedSVC.Spec.Selector = newSVC.Spec.Selector
+ updatedSVC.Spec.Ports = newSVC.Spec.Ports
+ return updatedSVC
+}
+
+// PatchService patches the old service to new service.
+func PatchService(kubeClient kubernetes.Interface,
+ oldSVC, newSVC *corev1.Service) error {
+ var oldData []byte
+ var err error
+ oldData, err = json.Marshal(oldSVC)
+ if err != nil {
+ klog.Errorf("marshal oldSVC (%+v) failed: %v", oldSVC, err)
+ return err
+ }
+ var newData []byte
+ newData, err = json.Marshal(mergeSVC(oldSVC, newSVC))
+ if err != nil {
+ klog.Errorf("marshal newSVC (%+v) failed: %v", newSVC, err)
+ return err
+ }
+ // build payload for patch method.
+ var patchBytes []byte
+ patchBytes, err = strategicpatch.CreateTwoWayMergePatch(oldData,
newData,
+ &corev1.Service{})
+ if err != nil {
+ klog.Errorf("created merge patch for service %v failed: %v",
utils.UniqueName(oldSVC), err)
+ return err
+ }
+ if _, err =
kubeClient.CoreV1().Services(oldSVC.Namespace).Patch(context.Background(),
+ oldSVC.Name, types.StrategicMergePatchType, patchBytes,
metav1.PatchOptions{}); err != nil {
+ klog.Errorf("patch service (%v) with (%v) failed: %v",
+ utils.UniqueName(oldSVC), string(patchBytes), err)
+ return err
+ }
+ return nil
+}
diff --git
a/deploy/kubernetes/operator/pkg/controller/util/kubernetes/serviceaccount.go
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/serviceaccount.go
new file mode 100644
index 00000000..aa7b3315
--- /dev/null
+++
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/serviceaccount.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kubernetes
+
+import (
+ "context"
+ "encoding/json"
+
+ corev1 "k8s.io/api/core/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/klog/v2"
+
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// SyncServiceAccount synchronizes serviceAccount object.
+func SyncServiceAccount(kubeClient kubernetes.Interface, sa
*corev1.ServiceAccount) error {
+ oldSA, err := kubeClient.CoreV1().ServiceAccounts(sa.Namespace).
+ Get(context.Background(), sa.Name, metav1.GetOptions{})
+ if err != nil {
+ if !apierrors.IsNotFound(err) {
+ klog.Errorf("get service account (%v) failed: %v",
utils.UniqueName(sa), err)
+ return err
+ }
+ // try to create a new serviceAccount.
+ if _, err = kubeClient.CoreV1().ServiceAccounts(sa.Namespace).
+ Create(context.Background(), sa,
metav1.CreateOptions{}); err != nil {
+ klog.Errorf("create service account (%v) failed: %v",
utils.UniqueName(sa), err)
+ return err
+ }
+ return nil
+ }
+ // if it already exists, try to update it through patch method.
+ return PatchServiceAccount(kubeClient, oldSA, sa)
+}
+
+// PatchServiceAccount patches the old serviceAccount to new serviceAccount.
+func PatchServiceAccount(kubeClient kubernetes.Interface,
+ oldSA, newSA *corev1.ServiceAccount) error {
+ var oldData []byte
+ var err error
+ oldData, err = json.Marshal(oldSA)
+ if err != nil {
+ klog.Errorf("marshal oldSA (%+v) failed: %v", oldSA, err)
+ return err
+ }
+ var newData []byte
+ newData, err = json.Marshal(newSA)
+ if err != nil {
+ klog.Errorf("marshal newSA (%+v) failed: %v", newSA, err)
+ return err
+ }
+ // build payload for patch method.
+ var patchBytes []byte
+ patchBytes, err = strategicpatch.CreateTwoWayMergePatch(oldData,
newData,
+ &corev1.ServiceAccount{})
+ if err != nil {
+ klog.Errorf("created merge patch for serviceAccount %v failed:
%v", utils.UniqueName(oldSA),
+ err)
+ return err
+ }
+ if _, err =
kubeClient.CoreV1().ServiceAccounts(oldSA.Namespace).Patch(context.Background(),
+ oldSA.Name, types.StrategicMergePatchType, patchBytes,
metav1.PatchOptions{}); err != nil {
+ klog.Errorf("patch serviceAccount (%v) failed: %v",
utils.UniqueName(oldSA), err)
+ }
+ return nil
+}
diff --git
a/deploy/kubernetes/operator/pkg/controller/util/kubernetes/statefulset.go
b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/statefulset.go
new file mode 100644
index 00000000..f4c90908
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/kubernetes/statefulset.go
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kubernetes
+
+import (
+ "context"
+ "encoding/json"
+
+ appsv1 "k8s.io/api/apps/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/klog/v2"
+
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// SyncStatefulSet synchronizes statefulSet object and returns original and
patched object.
+func SyncStatefulSet(kubeClient kubernetes.Interface, sts *appsv1.StatefulSet,
+ overwrite bool) (oldSts, newSts *appsv1.StatefulSet, err error) {
+ oldSts, err = kubeClient.AppsV1().StatefulSets(sts.Namespace).
+ Get(context.Background(), sts.Name, metav1.GetOptions{})
+ if err != nil {
+ if !apierrors.IsNotFound(err) {
+ klog.Errorf("get statefulSet (%v) failed: %v",
utils.UniqueName(sts), err)
+ return nil, nil, err
+ }
+ var created *appsv1.StatefulSet
+ created, err = kubeClient.AppsV1().StatefulSets(sts.Namespace).
+ Create(context.Background(), sts,
metav1.CreateOptions{})
+ if err != nil {
+ klog.Errorf("create statefulSet (%v) failed: %v",
utils.UniqueName(sts), err)
+ return nil, nil, err
+ }
+ return nil, created, nil
+ }
+ // determine if we need to force an update.
+ if !overwrite {
+ return oldSts, nil, nil
+ }
+ // if it already exists, try to update it through patch method.
+ newSts, err = PatchStatefulSet(kubeClient, oldSts, sts)
+ return oldSts, newSts, err
+}
+
+// NeedUpdateSts returns whether we need to update the statefulSet.
+func NeedUpdateSts(oldSts, newSts *appsv1.StatefulSet) ([]byte, bool, error) {
+ var oldData []byte
+ var err error
+ oldData, err = json.Marshal(oldSts)
+ if err != nil {
+ klog.Errorf("marshal oldSts (%+v) failed: %v", oldSts, err)
+ return nil, false, err
+ }
+ var newData []byte
+ newData, err = json.Marshal(newSts)
+ if err != nil {
+ klog.Errorf("marshal newSts (%+v) failed: %v", newSts, err)
+ return nil, false, err
+ }
+ var patchBytes []byte
+ patchBytes, err = strategicpatch.CreateTwoWayMergePatch(oldData,
newData,
+ &appsv1.StatefulSet{})
+ if err != nil {
+ klog.Errorf("created merge patch for statefulSet %v failed: %v",
+ utils.UniqueName(oldSts), err)
+ return nil, false, err
+ }
+ return patchBytes, string(patchBytes) != "{}", nil
+}
+
+// PatchStatefulSet patches the old statefulSet to new statefulSet.
+func PatchStatefulSet(kubeClient kubernetes.Interface,
+ oldSts, newSts *appsv1.StatefulSet) (*appsv1.StatefulSet, error) {
+ patchBytes, update, err := NeedUpdateSts(oldSts, newSts)
+ if err != nil {
+ return nil, err
+ }
+ if !update {
+ klog.V(4).Infof("do not need to patch statefulSet (%v)",
utils.UniqueName(oldSts))
+ return nil, nil
+ }
+ klog.V(5).Infof("patch body (%v) to statefulSet (%v)",
+ string(patchBytes), utils.UniqueName(oldSts))
+ // build payload for patch method.
+ var patched *appsv1.StatefulSet
+ patched, err =
kubeClient.AppsV1().StatefulSets(oldSts.Namespace).Patch(context.Background(),
+ oldSts.Name, types.StrategicMergePatchType, patchBytes,
+ metav1.PatchOptions{})
+ if err != nil {
+ klog.Errorf("patch statefulSet (%v) failed: %v",
utils.UniqueName(oldSts), err)
+ return nil, err
+ }
+ return patched, nil
+}
diff --git
a/deploy/kubernetes/operator/pkg/controller/util/properties/properties.go
b/deploy/kubernetes/operator/pkg/controller/util/properties/properties.go
new file mode 100644
index 00000000..49ac5ccf
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/properties/properties.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package properties
+
+import (
+ "sort"
+ "strings"
+
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/constants"
+)
+
+// UpdateProperties updates configuration of coordinators or shuffle servers.
+func UpdateProperties(properties string, kv map[constants.PropertyKey]string)
string {
+ oldKv := parseProperties(properties)
+ for k, v := range kv {
+ oldKv[k] = v
+ }
+ return generateProperties(oldKv)
+}
+
+// parseProperties converts string in properties format used in Java to a map.
+func parseProperties(properties string) map[constants.PropertyKey]string {
+ result := make(map[constants.PropertyKey]string)
+ lines := strings.Split(properties, "\n")
+ for _, line := range lines {
+ kv := strings.Split(line, " ")
+ if len(kv) < 2 {
+ continue
+ }
+ result[constants.PropertyKey(kv[0])] = kv[1]
+ }
+ return result
+}
+
+// generateProperties converts a map to string in properties format used in
Java.
+func generateProperties(kv map[constants.PropertyKey]string) string {
+ var lines []string
+ for k, v := range kv {
+ lines = append(lines, string(k)+" "+v)
+ }
+ sort.Strings(lines)
+ return strings.Join(lines, "\n")
+}
diff --git a/deploy/kubernetes/operator/pkg/controller/util/util.go
b/deploy/kubernetes/operator/pkg/controller/util/util.go
new file mode 100644
index 00000000..6f588681
--- /dev/null
+++ b/deploy/kubernetes/operator/pkg/controller/util/util.go
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package util
+
+import (
+ "fmt"
+ "strings"
+
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ v1core "k8s.io/client-go/kubernetes/typed/core/v1"
+ "k8s.io/client-go/tools/record"
+ "k8s.io/utils/pointer"
+
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
+ controllerconstants
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/constants"
+
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
+)
+
+// AddOwnerReference add OwnerReference to an object.
+func AddOwnerReference(meta *metav1.ObjectMeta, rss
*v1alpha1.RemoteShuffleService) {
+ meta.OwnerReferences = append(meta.OwnerReferences,
generateOwnerReference(rss)...)
+}
+
+// generateOwnerReference builds an OwnerReference for rss objects.
+func generateOwnerReference(rss *v1alpha1.RemoteShuffleService)
[]metav1.OwnerReference {
+ return []metav1.OwnerReference{
+ {
+ APIVersion: v1alpha1.SchemeGroupVersion.String(),
+ Kind: "RSS",
+ Name: rss.Name,
+ UID: rss.UID,
+ Controller: pointer.Bool(true),
+ },
+ }
+}
+
+// GenerateMainContainer generate main container of coordinators or shuffle
servers.
+func GenerateMainContainer(name, configDir string, podSpec
*v1alpha1.RSSPodSpec,
+ ports []corev1.ContainerPort, env []corev1.EnvVar,
+ initVolumeMounts []corev1.VolumeMount) *corev1.Container {
+ mainContainer := &corev1.Container{
+ Name: name,
+ Image: podSpec.Image,
+ Command: podSpec.Command,
+ Args: podSpec.Args,
+ ImagePullPolicy: corev1.PullAlways,
+ Resources: podSpec.Resources,
+ Ports: ports,
+ Env: env,
+ VolumeMounts: []corev1.VolumeMount{
+ {
+ Name:
controllerconstants.ConfigurationVolumeName,
+ MountPath: configDir,
+ },
+ },
+ }
+ mainContainer.VolumeMounts = append(mainContainer.VolumeMounts,
initVolumeMounts...)
+ addVolumeMountsOfMainContainer(mainContainer, podSpec.HostPathMounts,
podSpec.VolumeMounts)
+ return mainContainer
+}
+
+func addVolumeMountsOfMainContainer(mainContainer *corev1.Container,
+ hostPathMounts map[string]string, volumeMounts []corev1.VolumeMount) {
+ var clearPathCMDs []string
+ mainContainer.VolumeMounts = append(mainContainer.VolumeMounts,
+ GenerateHostPathVolumeMounts(hostPathMounts)...)
+ for _, mountPath := range hostPathMounts {
+ clearPathCMDs = append(clearPathCMDs, fmt.Sprintf("rm -rf
%v/*", strings.TrimSuffix(mountPath, "/")))
+ }
+ if len(clearPathCMDs) > 0 {
+ mainContainer.Lifecycle = &corev1.Lifecycle{
+ PreStop: &corev1.Handler{
+ Exec: &corev1.ExecAction{
+ Command: []string{"/bin/sh", "-c",
strings.Join(clearPathCMDs, ";")},
+ },
+ },
+ }
+ }
+ mainContainer.VolumeMounts = append(mainContainer.VolumeMounts,
volumeMounts...)
+}
+
+// GenerateHostPathVolumeMounts generates volume mounts for hostPaths
configured in rss objects.
+func GenerateHostPathVolumeMounts(hostPathMounts map[string]string)
[]corev1.VolumeMount {
+ var volumeMounts []corev1.VolumeMount
+ for hostPath, mountPath := range hostPathMounts {
+ volumeMounts = append(volumeMounts, corev1.VolumeMount{
+ Name: GenerateHostPathVolumeName(hostPath),
+ MountPath: mountPath,
+ })
+ }
+ return volumeMounts
+}
+
+// GenerateHostPathVolumes generates all hostPath volumes. If logHostPath is
not empty, we need to handle it in a
+// special way by adding the subPath to the hostPath which is the same as
logHostPath.
+func GenerateHostPathVolumes(hostPathMounts map[string]string, logHostPath,
subPath string) []corev1.Volume {
+ volumes := make([]corev1.Volume, 0)
+ for hostPath := range hostPathMounts {
+ if len(hostPath) == 0 {
+ continue
+ }
+ if hostPath == logHostPath {
+ volumes = append(volumes,
*GenerateHostPathVolume(hostPath, subPath))
+ } else {
+ volumes = append(volumes,
*GenerateHostPathVolume(hostPath, ""))
+ }
+ }
+ return volumes
+}
+
+// GenerateHostPathVolume convert host path to hostPath volume.
+func GenerateHostPathVolume(hostPath, subPath string) *corev1.Volume {
+ path := hostPath
+ if len(subPath) > 0 {
+ path = fmt.Sprintf("%v/%v", strings.TrimSuffix(path, "/"),
subPath)
+ }
+ hostPathType := corev1.HostPathDirectoryOrCreate
+ return &corev1.Volume{
+ Name: GenerateHostPathVolumeName(hostPath),
+ VolumeSource: corev1.VolumeSource{
+ HostPath: &corev1.HostPathVolumeSource{
+ Path: path,
+ Type: &hostPathType,
+ },
+ },
+ }
+}
+
+// GenerateHostPathVolumeName converts host path to volume name.
+func GenerateHostPathVolumeName(hostPath string) string {
+ hostPath = strings.TrimPrefix(hostPath, "/")
+ hostPath = strings.TrimSuffix(hostPath, "/")
+ hostPath = strings.ReplaceAll(hostPath, "/", "-")
+ return hostPath
+}
+
+// GenerateInitContainers generates init containers for coordinators and
shuffle servers.
+func GenerateInitContainers(rssPodSpec *v1alpha1.RSSPodSpec)
[]corev1.Container {
+ var initContainers []corev1.Container
+ if rssPodSpec.SecurityContext == nil ||
rssPodSpec.SecurityContext.FSGroup == nil {
+ return initContainers
+ }
+ if len(rssPodSpec.HostPathMounts) > 0 {
+ image := rssPodSpec.InitContainerImage
+ if len(image) == 0 {
+ image = constants.DefaultInitContainerImage
+ }
+ commands := generateMakeDataDirCommand(rssPodSpec)
+ initContainers = append(initContainers, corev1.Container{
+ Name: "init-data-dir",
+ Image: image,
+ ImagePullPolicy: corev1.PullIfNotPresent,
+ Command: []string{"sh", "-c",
strings.Join(commands, ";")},
+ SecurityContext: &corev1.SecurityContext{
+ RunAsUser: pointer.Int64(0),
+ Privileged: pointer.Bool(true),
+ },
+ VolumeMounts:
GenerateHostPathVolumeMounts(rssPodSpec.HostPathMounts),
+ })
+ if len(rssPodSpec.LogHostPath) > 0 {
+ initContainers = append(initContainers,
corev1.Container{
+ Name: "init-log-dir",
+ Image: image,
+ ImagePullPolicy: corev1.PullIfNotPresent,
+ Command: []string{
+ "sh", "-c",
+ fmt.Sprintf("mkdir -p %v && chmod -R
777 %v", rssPodSpec.LogHostPath,
+ rssPodSpec.LogHostPath),
+ },
+ SecurityContext: &corev1.SecurityContext{
+ RunAsUser: pointer.Int64(0),
+ Privileged: pointer.Bool(true),
+ },
+ VolumeMounts: []corev1.VolumeMount{
+ *generateLogVolumeMount(rssPodSpec),
+ },
+ })
+ }
+ }
+ return initContainers
+}
+
+func generateLogVolumeMount(rssPodSpec *v1alpha1.RSSPodSpec)
*corev1.VolumeMount {
+ logHostPath := rssPodSpec.LogHostPath
+ return &corev1.VolumeMount{
+ Name: GenerateHostPathVolumeName(logHostPath),
+ MountPath: rssPodSpec.HostPathMounts[logHostPath],
+ }
+}
+
+func generateMakeDataDirCommand(rssPodSpec *v1alpha1.RSSPodSpec) []string {
+ var commands []string
+ fsGroup := *rssPodSpec.SecurityContext.FSGroup
+ for _, mountPath := range rssPodSpec.HostPathMounts {
+ commands = append(commands, fmt.Sprintf("chown -R %v:%v %v",
fsGroup, fsGroup, mountPath))
+ }
+ return commands
+}
+
+// CreateRecorder creates an event recorder.
+func CreateRecorder(kubeClient kubernetes.Interface, component string)
record.EventRecorder {
+ eventBroadcaster := record.NewBroadcaster()
+ eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
+ Interface:
kubeClient.CoreV1().Events(utils.GetCurrentNamespace()),
+ })
+ return eventBroadcaster.NewRecorder(scheme.Scheme,
corev1.EventSource{Component: component})
+}
+
+// GetConfigMapOwner returns name of rss object as owner of configMap.
+func GetConfigMapOwner(cm *corev1.ConfigMap) string {
+ return cm.Labels[controllerconstants.OwnerLabel]
+}
diff --git a/deploy/kubernetes/operator/pkg/webhook/manager_test.go
b/deploy/kubernetes/operator/pkg/webhook/manager_test.go
index 038e3566..114ad5d9 100644
--- a/deploy/kubernetes/operator/pkg/webhook/manager_test.go
+++ b/deploy/kubernetes/operator/pkg/webhook/manager_test.go
@@ -108,7 +108,7 @@ var _ = AfterSuite(func() {
By("stopping admission manager")
ctxCancel()
By("tearing down the test environment")
- _ = testEnv.Stop()
+ Expect(testEnv.Stop()).To(Succeed())
})
var _ = Describe("AdmissionManager", func() {