This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch refactor-with-go
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git
The following commit(s) were added to refs/heads/refactor-with-go by this push:
new c7ed4c69 Migration election strategy (#1044)
c7ed4c69 is described below
commit c7ed4c699dcbe2f0a550aeb470c55180e30157dc
Author: xin gu <[email protected]>
AuthorDate: Mon Mar 27 17:27:18 2023 +0800
Migration election strategy (#1044)
* Migration election strategy
* license
* Migration election strategy
* Update server.go
* Migration election strategy
---
pkg/authority/election/leaderelection.go | 78 ++++++++++++++++++++++++++++++++
pkg/authority/k8s/client.go | 45 ++----------------
pkg/authority/security/server.go | 9 +++-
pkg/authority/security/server_test.go | 17 ++++++-
4 files changed, 104 insertions(+), 45 deletions(-)
diff --git a/pkg/authority/election/leaderelection.go
b/pkg/authority/election/leaderelection.go
new file mode 100644
index 00000000..83157f05
--- /dev/null
+++ b/pkg/authority/election/leaderelection.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package election
+
+import (
+ "context"
+ "time"
+
+ "github.com/apache/dubbo-admin/pkg/authority/cert"
+ "github.com/apache/dubbo-admin/pkg/authority/config"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/leaderelection"
+ "k8s.io/client-go/tools/leaderelection/resourcelock"
+)
+
+type LeaderElection interface {
+ Election(storage cert.Storage, options *config.Options, kubeClient
*kubernetes.Clientset) error
+}
+
+type leaderElectionImpl struct{}
+
+func NewleaderElection() LeaderElection {
+ return &leaderElectionImpl{}
+}
+
+func (c *leaderElectionImpl) Election(storage cert.Storage, options
*config.Options, kubeClient *kubernetes.Clientset) error {
+ identity := options.ResourcelockIdentity
+ rlConfig := resourcelock.ResourceLockConfig{
+ Identity: identity,
+ }
+ namespace := options.Namespace
+ _, err := kubeClient.CoreV1().Namespaces().Get(context.TODO(),
namespace, metav1.GetOptions{})
+ if err != nil {
+ namespace = "default"
+ }
+ lock, err :=
resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock, namespace,
"dubbo-lock-cert", kubeClient.CoreV1(), kubeClient.CoordinationV1(), rlConfig)
+ if err != nil {
+ return err
+ }
+ leaderElectionConfig := leaderelection.LeaderElectionConfig{
+ Lock: lock,
+ LeaseDuration: 15 * time.Second,
+ RenewDeadline: 10 * time.Second,
+ RetryPeriod: 2 * time.Second,
+ Callbacks: leaderelection.LeaderCallbacks{
+ // leader
+ OnStartedLeading: func(ctx context.Context) {
+ // lock if multi server,refresh signed cert
+
storage.SetAuthorityCert(cert.GenerateAuthorityCert(storage.GetRootCert(),
options.CaValidity))
+ },
+ // not leader
+ OnStoppedLeading: func() {
+ // TODO should be listen,when cert
resfresh,should be resfresh
+ },
+ // a new leader has been elected
+ OnNewLeader: func(identity string) {
+ },
+ },
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ leaderelection.RunOrDie(ctx, leaderElectionConfig)
+ return nil
+}
diff --git a/pkg/authority/k8s/client.go b/pkg/authority/k8s/client.go
index ba8e9f62..489dbfaf 100644
--- a/pkg/authority/k8s/client.go
+++ b/pkg/authority/k8s/client.go
@@ -40,8 +40,6 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
- "k8s.io/client-go/tools/leaderelection"
- "k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/homedir"
)
@@ -61,7 +59,7 @@ type Client interface {
UpdateWebhookConfig(options *config.Options, storage cert.Storage)
GetNamespaceLabels(namespace string) map[string]string
InitController(paHandler authentication.Handler, apHandler
authorization.Handler)
- Resourcelock(storage cert.Storage, options *config.Options) error
+ GetKubClient() *kubernetes.Clientset
}
type ClientImpl struct {
@@ -395,43 +393,6 @@ func (c *ClientImpl) InitController(
controller.WaitSynced()
}
-func (c *ClientImpl) Resourcelock(storage cert.Storage, options
*config.Options) error {
- identity := options.ResourcelockIdentity
- rlConfig := resourcelock.ResourceLockConfig{
- Identity: identity,
- }
- namespace := options.Namespace
- _, err := c.kubeClient.CoreV1().Namespaces().Get(context.TODO(),
namespace, metav1.GetOptions{})
- if err != nil {
- namespace = "default"
- }
- lock, err :=
resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock, namespace,
"dubbo-lock-cert", c.kubeClient.CoreV1(), c.kubeClient.CoordinationV1(),
rlConfig)
- if err != nil {
- return err
- }
- leaderElectionConfig := leaderelection.LeaderElectionConfig{
- Lock: lock,
- LeaseDuration: 15 * time.Second,
- RenewDeadline: 10 * time.Second,
- RetryPeriod: 2 * time.Second,
- Callbacks: leaderelection.LeaderCallbacks{
- // leader
- OnStartedLeading: func(ctx context.Context) {
- // lock if multi server,refresh signed cert
-
storage.SetAuthorityCert(cert.GenerateAuthorityCert(storage.GetRootCert(),
options.CaValidity))
- },
- // not leader
- OnStoppedLeading: func() {
- // TODO should be listen,when cert
resfresh,should be resfresh
- },
- // a new leader has been elected
- OnNewLeader: func(identity string) {
- },
- },
- }
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- leaderelection.RunOrDie(ctx, leaderElectionConfig)
- return nil
+func (c *ClientImpl) GetKubClient() *kubernetes.Clientset {
+ return c.kubeClient
}
diff --git a/pkg/authority/security/server.go b/pkg/authority/security/server.go
index 50e39647..060f9849 100644
--- a/pkg/authority/security/server.go
+++ b/pkg/authority/security/server.go
@@ -25,6 +25,8 @@ import (
"strconv"
"time"
+ "github.com/apache/dubbo-admin/pkg/authority/election"
+
cert2 "github.com/apache/dubbo-admin/pkg/authority/cert"
"github.com/apache/dubbo-admin/pkg/authority/config"
"github.com/apache/dubbo-admin/pkg/authority/k8s"
@@ -59,6 +61,7 @@ type Server struct {
WebhookServer *webhook.Webhook
JavaInjector *patch.JavaSdk
+ Elec election.LeaderElection
}
func NewServer(options *config.Options) *Server {
@@ -83,6 +86,9 @@ func (s *Server) Init() {
if s.CertStorage == nil {
s.CertStorage = cert2.NewStorage(s.Options)
}
+ if s.Elec == nil {
+ s.Elec = election.NewleaderElection()
+ }
go s.CertStorage.RefreshServerCert()
s.LoadRootCert()
@@ -182,7 +188,8 @@ func (s *Server) ScheduleRefreshAuthorityCert() {
logger.Sugar().Infof("Authority cert is invalid,
refresh it.")
// TODO lock if multi server
// TODO refresh signed cert
- s.KubeClient.Resourcelock(s.CertStorage, s.Options)
+
+ s.Elec.Election(s.CertStorage, s.Options,
s.KubeClient.GetKubClient())
if s.Options.IsKubernetesConnected {
s.KubeClient.UpdateAuthorityCert(s.CertStorage.GetAuthorityCert().CertPem,
cert2.EncodePrivateKey(s.CertStorage.GetAuthorityCert().PrivateKey),
s.Options.Namespace)
s.KubeClient.UpdateWebhookConfig(s.Options,
s.CertStorage)
diff --git a/pkg/authority/security/server_test.go
b/pkg/authority/security/server_test.go
index 480e401c..0e36b689 100644
--- a/pkg/authority/security/server_test.go
+++ b/pkg/authority/security/server_test.go
@@ -21,6 +21,10 @@ import (
"testing"
"time"
+ "github.com/apache/dubbo-admin/pkg/authority/election"
+
+ "k8s.io/client-go/kubernetes"
+
cert2 "github.com/apache/dubbo-admin/pkg/authority/cert"
"github.com/apache/dubbo-admin/pkg/authority/config"
"github.com/apache/dubbo-admin/pkg/authority/k8s"
@@ -54,8 +58,7 @@ func (s *mockKubeClient) UpdateAuthorityPublicKey(cert
string) bool {
func (s *mockKubeClient) UpdateWebhookConfig(options *config.Options, storage
cert2.Storage) {
}
-func (s *mockKubeClient) Resourcelock(storage cert2.Storage, options
*config.Options) error {
-
storage.SetAuthorityCert(cert2.GenerateAuthorityCert(storage.GetRootCert(),
options.CaValidity))
+func (s *mockKubeClient) GetKubClient() *kubernetes.Clientset {
return nil
}
@@ -99,6 +102,15 @@ func (s *mockStorage) GetStopChan() chan os.Signal {
return s.origin.GetStopChan()
}
+type mockLeaderElection struct {
+ election.LeaderElection
+}
+
+func (s *mockLeaderElection) Election(storage cert2.Storage, options
*config.Options, kubeClient *kubernetes.Clientset) error {
+
storage.SetAuthorityCert(cert2.GenerateAuthorityCert(storage.GetRootCert(),
options.CaValidity))
+ return nil
+}
+
func TestInit(t *testing.T) {
t.Parallel()
@@ -170,6 +182,7 @@ func TestRefresh(t *testing.T) {
s.KubeClient = &mockKubeClient{}
storage := &mockStorage{}
+ s.Elec = &mockLeaderElection{}
storage.origin = cert2.NewStorage(options)
s.CertStorage = storage