This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch refactor-with-go
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git


The following commit(s) were added to refs/heads/refactor-with-go by this push:
     new c7ed4c69 Migration election strategy (#1044)
c7ed4c69 is described below

commit c7ed4c699dcbe2f0a550aeb470c55180e30157dc
Author: xin gu <[email protected]>
AuthorDate: Mon Mar 27 17:27:18 2023 +0800

    Migration election strategy (#1044)
    
    * Migration election strategy
    
    * license
    
    * Migration election strategy
    
    * Update server.go
    
    * Migration election strategy
---
 pkg/authority/election/leaderelection.go | 78 ++++++++++++++++++++++++++++++++
 pkg/authority/k8s/client.go              | 45 ++----------------
 pkg/authority/security/server.go         |  9 +++-
 pkg/authority/security/server_test.go    | 17 ++++++-
 4 files changed, 104 insertions(+), 45 deletions(-)

diff --git a/pkg/authority/election/leaderelection.go 
b/pkg/authority/election/leaderelection.go
new file mode 100644
index 00000000..83157f05
--- /dev/null
+++ b/pkg/authority/election/leaderelection.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package election
+
+import (
+       "context"
+       "time"
+
+       "github.com/apache/dubbo-admin/pkg/authority/cert"
+       "github.com/apache/dubbo-admin/pkg/authority/config"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/client-go/tools/leaderelection"
+       "k8s.io/client-go/tools/leaderelection/resourcelock"
+)
+
+type LeaderElection interface {
+       Election(storage cert.Storage, options *config.Options, kubeClient 
*kubernetes.Clientset) error
+}
+
+type leaderElectionImpl struct{}
+
+func NewleaderElection() LeaderElection {
+       return &leaderElectionImpl{}
+}
+
+func (c *leaderElectionImpl) Election(storage cert.Storage, options 
*config.Options, kubeClient *kubernetes.Clientset) error {
+       identity := options.ResourcelockIdentity
+       rlConfig := resourcelock.ResourceLockConfig{
+               Identity: identity,
+       }
+       namespace := options.Namespace
+       _, err := kubeClient.CoreV1().Namespaces().Get(context.TODO(), 
namespace, metav1.GetOptions{})
+       if err != nil {
+               namespace = "default"
+       }
+       lock, err := 
resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock, namespace, 
"dubbo-lock-cert", kubeClient.CoreV1(), kubeClient.CoordinationV1(), rlConfig)
+       if err != nil {
+               return err
+       }
+       leaderElectionConfig := leaderelection.LeaderElectionConfig{
+               Lock:          lock,
+               LeaseDuration: 15 * time.Second,
+               RenewDeadline: 10 * time.Second,
+               RetryPeriod:   2 * time.Second,
+               Callbacks: leaderelection.LeaderCallbacks{
+                       // leader
+                       OnStartedLeading: func(ctx context.Context) {
+                               // lock if multi server,refresh signed cert
+                               
storage.SetAuthorityCert(cert.GenerateAuthorityCert(storage.GetRootCert(), 
options.CaValidity))
+                       },
+                       // not leader
+                       OnStoppedLeading: func() {
+                               // TODO should be listen,when cert 
resfresh,should be resfresh
+                       },
+                       // a new leader has been elected
+                       OnNewLeader: func(identity string) {
+                       },
+               },
+       }
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       leaderelection.RunOrDie(ctx, leaderElectionConfig)
+       return nil
+}
diff --git a/pkg/authority/k8s/client.go b/pkg/authority/k8s/client.go
index ba8e9f62..489dbfaf 100644
--- a/pkg/authority/k8s/client.go
+++ b/pkg/authority/k8s/client.go
@@ -40,8 +40,6 @@ import (
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/rest"
        "k8s.io/client-go/tools/clientcmd"
-       "k8s.io/client-go/tools/leaderelection"
-       "k8s.io/client-go/tools/leaderelection/resourcelock"
        "k8s.io/client-go/util/homedir"
 )
 
@@ -61,7 +59,7 @@ type Client interface {
        UpdateWebhookConfig(options *config.Options, storage cert.Storage)
        GetNamespaceLabels(namespace string) map[string]string
        InitController(paHandler authentication.Handler, apHandler 
authorization.Handler)
-       Resourcelock(storage cert.Storage, options *config.Options) error
+       GetKubClient() *kubernetes.Clientset
 }
 
 type ClientImpl struct {
@@ -395,43 +393,6 @@ func (c *ClientImpl) InitController(
        controller.WaitSynced()
 }
 
-func (c *ClientImpl) Resourcelock(storage cert.Storage, options 
*config.Options) error {
-       identity := options.ResourcelockIdentity
-       rlConfig := resourcelock.ResourceLockConfig{
-               Identity: identity,
-       }
-       namespace := options.Namespace
-       _, err := c.kubeClient.CoreV1().Namespaces().Get(context.TODO(), 
namespace, metav1.GetOptions{})
-       if err != nil {
-               namespace = "default"
-       }
-       lock, err := 
resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock, namespace, 
"dubbo-lock-cert", c.kubeClient.CoreV1(), c.kubeClient.CoordinationV1(), 
rlConfig)
-       if err != nil {
-               return err
-       }
-       leaderElectionConfig := leaderelection.LeaderElectionConfig{
-               Lock:          lock,
-               LeaseDuration: 15 * time.Second,
-               RenewDeadline: 10 * time.Second,
-               RetryPeriod:   2 * time.Second,
-               Callbacks: leaderelection.LeaderCallbacks{
-                       // leader
-                       OnStartedLeading: func(ctx context.Context) {
-                               // lock if multi server,refresh signed cert
-                               
storage.SetAuthorityCert(cert.GenerateAuthorityCert(storage.GetRootCert(), 
options.CaValidity))
-                       },
-                       // not leader
-                       OnStoppedLeading: func() {
-                               // TODO should be listen,when cert 
resfresh,should be resfresh
-                       },
-                       // a new leader has been elected
-                       OnNewLeader: func(identity string) {
-                       },
-               },
-       }
-
-       ctx, cancel := context.WithCancel(context.Background())
-       defer cancel()
-       leaderelection.RunOrDie(ctx, leaderElectionConfig)
-       return nil
+func (c *ClientImpl) GetKubClient() *kubernetes.Clientset {
+       return c.kubeClient
 }
diff --git a/pkg/authority/security/server.go b/pkg/authority/security/server.go
index 50e39647..060f9849 100644
--- a/pkg/authority/security/server.go
+++ b/pkg/authority/security/server.go
@@ -25,6 +25,8 @@ import (
        "strconv"
        "time"
 
+       "github.com/apache/dubbo-admin/pkg/authority/election"
+
        cert2 "github.com/apache/dubbo-admin/pkg/authority/cert"
        "github.com/apache/dubbo-admin/pkg/authority/config"
        "github.com/apache/dubbo-admin/pkg/authority/k8s"
@@ -59,6 +61,7 @@ type Server struct {
 
        WebhookServer *webhook.Webhook
        JavaInjector  *patch.JavaSdk
+       Elec          election.LeaderElection
 }
 
 func NewServer(options *config.Options) *Server {
@@ -83,6 +86,9 @@ func (s *Server) Init() {
        if s.CertStorage == nil {
                s.CertStorage = cert2.NewStorage(s.Options)
        }
+       if s.Elec == nil {
+               s.Elec = election.NewleaderElection()
+       }
        go s.CertStorage.RefreshServerCert()
 
        s.LoadRootCert()
@@ -182,7 +188,8 @@ func (s *Server) ScheduleRefreshAuthorityCert() {
                        logger.Sugar().Infof("Authority cert is invalid, 
refresh it.")
                        // TODO lock if multi server
                        // TODO refresh signed cert
-                       s.KubeClient.Resourcelock(s.CertStorage, s.Options)
+
+                       s.Elec.Election(s.CertStorage, s.Options, 
s.KubeClient.GetKubClient())
                        if s.Options.IsKubernetesConnected {
                                
s.KubeClient.UpdateAuthorityCert(s.CertStorage.GetAuthorityCert().CertPem, 
cert2.EncodePrivateKey(s.CertStorage.GetAuthorityCert().PrivateKey), 
s.Options.Namespace)
                                s.KubeClient.UpdateWebhookConfig(s.Options, 
s.CertStorage)
diff --git a/pkg/authority/security/server_test.go 
b/pkg/authority/security/server_test.go
index 480e401c..0e36b689 100644
--- a/pkg/authority/security/server_test.go
+++ b/pkg/authority/security/server_test.go
@@ -21,6 +21,10 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/dubbo-admin/pkg/authority/election"
+
+       "k8s.io/client-go/kubernetes"
+
        cert2 "github.com/apache/dubbo-admin/pkg/authority/cert"
        "github.com/apache/dubbo-admin/pkg/authority/config"
        "github.com/apache/dubbo-admin/pkg/authority/k8s"
@@ -54,8 +58,7 @@ func (s *mockKubeClient) UpdateAuthorityPublicKey(cert 
string) bool {
 func (s *mockKubeClient) UpdateWebhookConfig(options *config.Options, storage 
cert2.Storage) {
 }
 
-func (s *mockKubeClient) Resourcelock(storage cert2.Storage, options 
*config.Options) error {
-       
storage.SetAuthorityCert(cert2.GenerateAuthorityCert(storage.GetRootCert(), 
options.CaValidity))
+func (s *mockKubeClient) GetKubClient() *kubernetes.Clientset {
        return nil
 }
 
@@ -99,6 +102,15 @@ func (s *mockStorage) GetStopChan() chan os.Signal {
        return s.origin.GetStopChan()
 }
 
+type mockLeaderElection struct {
+       election.LeaderElection
+}
+
+func (s *mockLeaderElection) Election(storage cert2.Storage, options 
*config.Options, kubeClient *kubernetes.Clientset) error {
+       
storage.SetAuthorityCert(cert2.GenerateAuthorityCert(storage.GetRootCert(), 
options.CaValidity))
+       return nil
+}
+
 func TestInit(t *testing.T) {
        t.Parallel()
 
@@ -170,6 +182,7 @@ func TestRefresh(t *testing.T) {
 
        s.KubeClient = &mockKubeClient{}
        storage := &mockStorage{}
+       s.Elec = &mockLeaderElection{}
        storage.origin = cert2.NewStorage(options)
        s.CertStorage = storage
 

Reply via email to