This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new aa68e8d  Clear no-instance services automatically (#577)
aa68e8d is described below

commit aa68e8db69645e30e1f10a61179aca62766fd22b
Author: humingcheng <[email protected]>
AuthorDate: Sat Aug 24 17:35:41 2019 +0800

    Clear no-instance services automatically (#577)
---
 scripts/prepare_env_ut.sh                        |   8 +-
 scripts/ut.sh                                    |  12 +-
 server/core/config.go                            |  34 +++++-
 server/core/proto/types.go                       |   8 ++
 server/mux/mux.go                                |   5 +-
 server/server.go                                 |  48 +++++++-
 server/service/event/dependency_event_handler.go |   4 +-
 server/service/util/microservice_util.go         |  40 +++++++
 server/task/clear_service.go                     | 107 ++++++++++++++++++
 server/task/clear_service_test.go                | 133 +++++++++++++++++++++++
 10 files changed, 379 insertions(+), 20 deletions(-)

diff --git a/scripts/prepare_env_ut.sh b/scripts/prepare_env_ut.sh
index 2868ee1..c2fd19a 100755
--- a/scripts/prepare_env_ut.sh
+++ b/scripts/prepare_env_ut.sh
@@ -14,5 +14,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-cp -r etc/conf server/plugin/infra/tls/buildin
-echo "mode: atomic" > coverage.txt
+CURRENT_PATH=$(cd $(dirname $0);pwd)
+ROOT_PATH=$(dirname $CURRENT_PATH)
+
+mkdir -p $ROOT_PATH/server/plugin/infra/tls/buildin
+cp -r $ROOT_PATH/etc/conf $ROOT_PATH/server/plugin/infra/tls/buildin
+echo "mode: atomic" > $ROOT_PATH/coverage.txt
diff --git a/scripts/ut.sh b/scripts/ut.sh
index 849784b..4b19dbc 100755
--- a/scripts/ut.sh
+++ b/scripts/ut.sh
@@ -16,14 +16,18 @@
 # limitations under the License.
 
 set -e
+CURRENT_PATH=$(cd $(dirname $0);pwd)
+ROOT_PATH=$(dirname $CURRENT_PATH)
+
 export COVERAGE_PATH=$(pwd)
 cd $1
-for d in $(go list ./... | grep -v vendor); do
-    cd $GOPATH/src/$d
+for d in $(go list -f '{{.Dir}}' ./... | grep -v vendor); do
+    cd $d
     if [ $(ls | grep _test.go | wc -l) -gt 0 ]; then
-        go test -cover -covermode atomic -coverprofile coverage.out  
+        go test -cover -covermode atomic -coverprofile coverage.out
         if [ -f coverage.out ]; then
-            sed '1d;$d' coverage.out >> 
$GOPATH/src/github.com/apache/servicecomb-service-center/coverage.txt
+            sed '1d;$d' coverage.out >> $ROOT_PATH/coverage.txt
         fi
     fi
 done
+
diff --git a/server/core/config.go b/server/core/config.go
index ab2f891..ae6ac84 100644
--- a/server/core/config.go
+++ b/server/core/config.go
@@ -17,18 +17,29 @@
 package core
 
 import (
+       "os"
+       "runtime"
+       "time"
+
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/plugin"
        "github.com/apache/servicecomb-service-center/pkg/util"
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
        "github.com/apache/servicecomb-service-center/version"
        "github.com/astaxie/beego"
-       "os"
-       "runtime"
 )
 
 const (
-       INIT_VERSION = "0"
+       InitVersion = "0"
+
+       defaultServiceClearInterval = 12 * time.Hour //0.5 day
+       defaultServiceTTL           = 24 * time.Hour //1 day
+
+       minServiceClearInterval = 30 * time.Second
+       minServiceTTL           = 30 * time.Second
+
+       maxServiceClearInterval = 24 * time.Hour       //1 day
+       maxServiceTTL           = 24 * 365 * time.Hour //1 year
 )
 
 var ServerInfo = pb.NewServerInformation()
@@ -54,8 +65,19 @@ func newInfo() pb.ServerInformation {
        if maxLogBackupCount < 0 || maxLogBackupCount > 100 {
                maxLogBackupCount = 50
        }
+
+       serviceClearInterval, err := 
time.ParseDuration(os.Getenv("SERVICE_CLEAR_INTERVAL"))
+       if err != nil || serviceClearInterval < minServiceClearInterval || 
serviceClearInterval > maxServiceClearInterval {
+               serviceClearInterval = defaultServiceClearInterval
+       }
+
+       serviceTTL, err := time.ParseDuration(os.Getenv("SERVICE_TTL"))
+       if err != nil || serviceTTL < minServiceTTL || serviceTTL > 
maxServiceTTL {
+               serviceTTL = defaultServiceTTL
+       }
+
        return pb.ServerInformation{
-               Version: INIT_VERSION,
+               Version: InitVersion,
                Config: pb.ServerConfig{
                        MaxHeaderBytes: 
int64(beego.AppConfig.DefaultInt("max_header_bytes", 16384)),
                        MaxBodyBytes:   
beego.AppConfig.DefaultInt64("max_body_bytes", 2097152),
@@ -92,6 +114,10 @@ func newInfo() pb.ServerInformation {
                        EnablePProf:  
beego.AppConfig.DefaultInt("enable_pprof", 0) != 0,
                        EnableCache:  
beego.AppConfig.DefaultInt("enable_cache", 1) != 0,
                        SelfRegister: 
beego.AppConfig.DefaultInt("self_register", 1) != 0,
+
+                       ServiceClearEnabled:  
os.Getenv("SERVICE_CLEAR_ENABLED") == "true",
+                       ServiceClearInterval: serviceClearInterval,
+                       ServiceTTL:           serviceTTL,
                },
        }
 }
diff --git a/server/core/proto/types.go b/server/core/proto/types.go
index 36de9a4..a157ebf 100644
--- a/server/core/proto/types.go
+++ b/server/core/proto/types.go
@@ -16,6 +16,8 @@
 package proto
 
 import (
+       "time"
+
        "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
@@ -61,6 +63,12 @@ type ServerConfig struct {
        Plugins    util.JSONObject `json:"plugins"`
 
        SelfRegister bool `json:"selfRegister"`
+
+       //clear no-instance services
+       ServiceClearEnabled  bool          `json:"serviceClearEnabled"`
+       ServiceClearInterval time.Duration `json:"serviceClearInterval"`
+       //if a service's existence time reaches this value, it can be cleared
+       ServiceTTL time.Duration `json:"serviceTTL"`
 }
 
 type ServerInformation struct {
diff --git a/server/mux/mux.go b/server/mux/mux.go
index 8f3a8bf..c2c844e 100644
--- a/server/mux/mux.go
+++ b/server/mux/mux.go
@@ -33,8 +33,9 @@ func (m *MuxType) String() (s string) {
 }
 
 const (
-       GLOBAL_LOCK    MuxType = "/cse-sr/lock/global"
-       DEP_QUEUE_LOCK MuxType = "/cse-sr/lock/dep-queue"
+       GlobalLock       MuxType = "/cse-sr/lock/global"
+       DepQueueLock     MuxType = "/cse-sr/lock/dep-queue"
+       ServiceClearLock MuxType = "/cse-sr/lock/service-clear"
 )
 
 func Lock(t MuxType) (*etcdsync.DLock, error) {
diff --git a/server/server.go b/server/server.go
index 72ea40b..8f04b66 100644
--- a/server/server.go
+++ b/server/server.go
@@ -19,6 +19,9 @@ package server
 import _ "github.com/apache/servicecomb-service-center/server/service/event"
 import (
        "fmt"
+       "os"
+       "time"
+
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
        nf "github.com/apache/servicecomb-service-center/pkg/notify"
@@ -28,11 +31,10 @@ import (
        "github.com/apache/servicecomb-service-center/server/notify"
        "github.com/apache/servicecomb-service-center/server/plugin"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
+       "github.com/apache/servicecomb-service-center/server/task"
        "github.com/apache/servicecomb-service-center/version"
        "github.com/astaxie/beego"
        "golang.org/x/net/context"
-       "os"
-       "time"
 )
 
 const buildin = "buildin"
@@ -80,7 +82,7 @@ func (s *ServiceCenterServer) needUpgrade() bool {
 }
 
 func (s *ServiceCenterServer) loadOrUpgradeServerVersion() {
-       lock, err := mux.Lock(mux.GLOBAL_LOCK)
+       lock, err := mux.Lock(mux.GlobalLock)
        if err != nil {
                log.Errorf(err, "wait for server ready failed")
                os.Exit(1)
@@ -114,8 +116,8 @@ func (s *ServiceCenterServer) compactBackendService() {
                        case <-ctx.Done():
                                return
                        case <-time.After(interval):
-                               lock, err := mux.Try(mux.GLOBAL_LOCK)
-                               if lock == nil {
+                               lock, err := mux.Try(mux.GlobalLock)
+                               if err != nil {
                                        log.Errorf(err, "can not compact 
backend by this service center instance now")
                                        continue
                                }
@@ -128,6 +130,38 @@ func (s *ServiceCenterServer) compactBackendService() {
        })
 }
 
+// clear services who have no instance
+func (s *ServiceCenterServer) clearNoInstanceServices() {
+       if !core.ServerInfo.Config.ServiceClearEnabled {
+               return
+       }
+       log.Infof("service clear enabled, interval: %s, service TTL: %s",
+               core.ServerInfo.Config.ServiceClearInterval,
+               core.ServerInfo.Config.ServiceTTL)
+
+       s.goroutine.Do(func(ctx context.Context) {
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case 
<-time.After(core.ServerInfo.Config.ServiceClearInterval):
+                               lock, err := mux.Try(mux.ServiceClearLock)
+                               if err != nil {
+                                       log.Errorf(err, "can not clear no 
instance services by this service center instance now")
+                                       continue
+                               }
+                               err = 
task.ClearNoInstanceServices(core.ServerInfo.Config.ServiceTTL)
+                               lock.Unlock()
+                               if err != nil {
+                                       log.Errorf(err, "no-instance services 
cleanup failed")
+                                       continue
+                               }
+                               log.Info("no-instance services cleanup succeed")
+                       }
+               }
+       })
+}
+
 func (s *ServiceCenterServer) initialize() {
        s.cacheService = backend.Store()
        s.apiService = GetAPIServer()
@@ -151,9 +185,11 @@ func (s *ServiceCenterServer) startServices() {
        s.cacheService.Run()
        <-s.cacheService.Ready()
 
-       // compact backend automatically
        if buildin != beego.AppConfig.DefaultString("registry_plugin", buildin) 
{
+               // compact backend automatically
                s.compactBackendService()
+               // clean no-instance services automatically
+               s.clearNoInstanceServices()
        }
 
        // api service
diff --git a/server/service/event/dependency_event_handler.go 
b/server/service/event/dependency_event_handler.go
index 2af407b..5e25e12 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -63,9 +63,9 @@ func (h *DependencyEventHandler) backoff(f func(), retries 
int) int {
 }
 
 func (h *DependencyEventHandler) tryWithBackoff(success func() error, backoff 
func(), retries int) (error, int) {
-       lock, err := mux.Try(mux.DEP_QUEUE_LOCK)
+       lock, err := mux.Try(mux.DepQueueLock)
        if err != nil {
-               log.Errorf(err, "try to lock %s failed", mux.DEP_QUEUE_LOCK)
+               log.Errorf(err, "try to lock %s failed", mux.DepQueueLock)
                return err, h.backoff(backoff, retries)
        }
 
diff --git a/server/service/util/microservice_util.go 
b/server/service/util/microservice_util.go
index 43ed799..1972395 100644
--- a/server/service/util/microservice_util.go
+++ b/server/service/util/microservice_util.go
@@ -18,6 +18,8 @@ package util
 
 import (
        "encoding/json"
+       "strings"
+
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
        apt "github.com/apache/servicecomb-service-center/server/core"
@@ -27,6 +29,7 @@ import (
        
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
        "github.com/apache/servicecomb-service-center/server/plugin/pkg/quota"
        
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+
        "golang.org/x/net/context"
 )
 
@@ -72,6 +75,43 @@ func getServicesRawData(ctx context.Context, domainProject 
string) ([]*discovery
        return resp.Kvs, err
 }
 
+//GetAllServicesAcrossDomainProject get services of all domains, projects
+//the map's key is domainProject
+func GetAllServicesAcrossDomainProject(ctx context.Context) 
(map[string][]*pb.MicroService, error) {
+       key := apt.GetServiceRootKey("")
+       opts := append(FromContext(ctx),
+               registry.WithStrKey(key),
+               registry.WithPrefix())
+       serviceResp, err := backend.Store().Service().Search(ctx, opts...)
+       if err != nil {
+               return nil, err
+       }
+
+       services := make(map[string][]*pb.MicroService)
+       if len(serviceResp.Kvs) == 0 {
+               return services, nil
+       }
+
+       for _, value := range serviceResp.Kvs {
+               prefix := util.BytesToStringWithNoCopy(value.Key)
+               parts := strings.Split(prefix, apt.SPLIT)
+               if len(parts) != 7 {
+                       continue
+               }
+               domainProject := parts[4] + apt.SPLIT + parts[5]
+               microService, ok := value.Value.(*pb.MicroService)
+               if !ok {
+                       log.Error("backend data is not type *pb.MicroService", 
nil)
+                       continue
+               }
+               if _, ok := services[domainProject]; !ok {
+                       services[domainProject] = make([]*pb.MicroService, 0)
+               }
+               services[domainProject] = append(services[domainProject], 
microService)
+       }
+       return services, nil
+}
+
 func GetServicesByDomainProject(ctx context.Context, domainProject string) 
([]*pb.MicroService, error) {
        kvs, err := getServicesRawData(ctx, domainProject)
        if err != nil {
diff --git a/server/task/clear_service.go b/server/task/clear_service.go
new file mode 100644
index 0000000..8c2daff
--- /dev/null
+++ b/server/task/clear_service.go
@@ -0,0 +1,107 @@
+package task
+
+import (
+       "context"
+       "errors"
+       "strconv"
+       "strings"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       apt "github.com/apache/servicecomb-service-center/server/core"
+       pb "github.com/apache/servicecomb-service-center/server/core/proto"
+       serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
+)
+
+// ClearNoInstanceService clears services which have no instance
+func ClearNoInstanceServices(serviceTTL time.Duration) error {
+       services, err := 
serviceUtil.GetAllServicesAcrossDomainProject(context.Background())
+       if err != nil {
+               return err
+       }
+       if len(services) == 0 {
+               log.Info("no service found, no need to clear")
+               return nil
+       }
+       timeLimit := time.Now().Add(0 - serviceTTL)
+       log.Infof("clear no-instance services created before %s", timeLimit)
+       timeLimitStamp := strconv.FormatInt(timeLimit.Unix(), 10)
+
+       for domainProject, svcList := range services {
+               if len(svcList) == 0 {
+                       continue
+               }
+               ctx, err := ctxFromDomainProject(domainProject)
+               if err != nil {
+                       log.Errorf(err, "get domain project context failed")
+                       continue
+               }
+               for _, svc := range svcList {
+                       if svc == nil {
+                               continue
+                       }
+                       ok, err := shouldClear(ctx, timeLimitStamp, svc)
+                       if err != nil {
+                               log.Errorf(err, "check service clear necessity 
failed")
+                               continue
+                       }
+                       if !ok {
+                               continue
+                       }
+                       //delete this service
+                       svcCtxStr := "domainProject: " + domainProject + ", " +
+                               "env: " + svc.Environment + ", " +
+                               "service: " + 
util.StringJoin([]string{svc.AppId, svc.ServiceName, svc.Version}, apt.SPLIT)
+                       delSvcReq := &pb.DeleteServiceRequest{
+                               ServiceId: svc.ServiceId,
+                               Force:     true, //force delete
+                       }
+                       delSvcResp, err := apt.ServiceAPI.Delete(ctx, delSvcReq)
+                       if err != nil {
+                               log.Errorf(err, "clear service failed, %s", 
svcCtxStr)
+                               continue
+                       }
+                       if delSvcResp.Response.GetCode() != pb.Response_SUCCESS 
{
+                               log.Errorf(nil, "clear service failed, %s, %s", 
delSvcResp.Response.GetMessage(), svcCtxStr)
+                               continue
+                       }
+                       log.Warnf("clear service success, %s", svcCtxStr)
+               }
+       }
+       return nil
+}
+
+func ctxFromDomainProject(domainProject string) (ctx context.Context, err 
error) {
+       splitIndex := strings.Index(domainProject, apt.SPLIT)
+       if splitIndex == -1 {
+               return nil, errors.New("invalid domainProject: " + 
domainProject)
+       }
+       domain := domainProject[:splitIndex]
+       project := domainProject[splitIndex+1:]
+       return util.SetDomainProject(context.Background(), domain, project), nil
+}
+
+//check whether a service should be cleared
+func shouldClear(ctx context.Context, timeLimitStamp string, svc 
*pb.MicroService) (bool, error) {
+       //ignore a service if it is created after timeLimitStamp
+       if svc.Timestamp > timeLimitStamp {
+               return false, nil
+       }
+       getInstsReq := &pb.GetInstancesRequest{
+               ConsumerServiceId: svc.ServiceId,
+               ProviderServiceId: svc.ServiceId,
+       }
+       getInstsResp, err := apt.InstanceAPI.GetInstances(ctx, getInstsReq)
+       if err != nil {
+               return false, err
+       }
+       if getInstsResp.Response.GetCode() != pb.Response_SUCCESS {
+               return false, errors.New("get instance failed: " + 
getInstsResp.Response.GetMessage())
+       }
+       //ignore a service if it has instances
+       if len(getInstsResp.Instances) > 0 {
+               return false, nil
+       }
+       return true, nil
+}
diff --git a/server/task/clear_service_test.go 
b/server/task/clear_service_test.go
new file mode 100644
index 0000000..29ac868
--- /dev/null
+++ b/server/task/clear_service_test.go
@@ -0,0 +1,133 @@
+package task_test
+
+// initialize
+import _ "github.com/apache/servicecomb-service-center/server/init"
+import _ "github.com/apache/servicecomb-service-center/server/bootstrap"
+import _ "github.com/apache/servicecomb-service-center/server"
+
+import (
+       "context"
+       "fmt"
+       "testing"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       apt "github.com/apache/servicecomb-service-center/server/core"
+       pb "github.com/apache/servicecomb-service-center/server/core/proto"
+       serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
+       "github.com/apache/servicecomb-service-center/server/task"
+       "github.com/astaxie/beego"
+
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+var _ = BeforeSuite(func() {
+       beego.AppConfig.Set("registry_plugin", "etcd")
+       //clear service created in last test
+       time.Sleep(timeLimit)
+       task.ClearNoInstanceServices(timeLimit)
+})
+
+// map[domainProject][serviceName]*serviceCleanInfo
+var svcCleanInfos = make(map[string]map[string]*serviceCleanInfo)
+var timeLimit = 2 * time.Second
+
+type serviceCleanInfo struct {
+       ServiceName  string
+       ServiceId    string
+       WithInstance bool
+       ShouldClear  bool
+}
+
+func TestTask(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Task Suite")
+}
+
+func getContext(domain string, project string) context.Context {
+       return util.SetContext(
+               util.SetDomainProject(context.Background(), domain, project),
+               serviceUtil.CTX_NOCACHE, "1")
+}
+
+func createService(domain string, project string, name string, withInstance 
bool, shouldClear bool) {
+       By(fmt.Sprintf("create service: %s, with instance: %t, should clear: 
%t", name, withInstance, shouldClear))
+       svc := &pb.CreateServiceRequest{
+               Service: &pb.MicroService{
+                       AppId:       "clear",
+                       ServiceName: name,
+                       Version:     "1.0",
+               },
+       }
+       if withInstance {
+               svc.Instances = []*pb.MicroServiceInstance{
+                       &pb.MicroServiceInstance{
+                               Endpoints: []string{"http://127.0.0.1:80"},
+                               HostName:  "1",
+                       },
+               }
+       }
+       ctx := getContext(domain, project)
+       svcResp, err := apt.ServiceAPI.Create(ctx, svc)
+       Expect(err).To(BeNil())
+       Expect(svcResp).NotTo(BeNil())
+       Expect(svcResp.Response.GetCode()).To(Equal(pb.Response_SUCCESS))
+       info := &serviceCleanInfo{
+               ServiceName:  name,
+               ServiceId:    svcResp.ServiceId,
+               WithInstance: withInstance,
+               ShouldClear:  shouldClear,
+       }
+       domainProject := domain + apt.SPLIT + project
+       m, ok := svcCleanInfos[domainProject]
+       if !ok {
+               m = make(map[string]*serviceCleanInfo)
+               svcCleanInfos[domainProject] = m
+       }
+       m[name] = info
+}
+
+func checkServiceCleared(domain string, project string) {
+       domainProject := domain + apt.SPLIT + project
+       m := svcCleanInfos[domainProject]
+       for _, v := range m {
+               By(fmt.Sprintf("check cleared, service: %s, should be cleared: 
%t", v.ServiceName, v.ShouldClear))
+               getSvcReq := &pb.GetServiceRequest{
+                       ServiceId: v.ServiceId,
+               }
+               ctx := getContext(domain, project)
+               getSvcResp, err := apt.ServiceAPI.GetOne(ctx, getSvcReq)
+               Expect(err).To(BeNil())
+               Expect(getSvcResp).NotTo(BeNil())
+               Expect(getSvcResp.Response.GetCode() == 
pb.Response_SUCCESS).To(Equal(!v.ShouldClear))
+       }
+}
+
+func serviceClearCheckFunc(domain string, project string) func() {
+       return func() {
+               var err error
+               It("should run clear task success", func() {
+                       withInstance := true
+                       withNoInstance := false
+                       shouldClear := true
+                       shouldNotClear := false
+
+                       createService(domain, project, "svc1", withNoInstance, 
shouldClear)
+                       createService(domain, project, "svc2", withInstance, 
shouldNotClear)
+                       time.Sleep(timeLimit)
+                       createService(domain, project, "svc3", withNoInstance, 
shouldNotClear)
+                       createService(domain, project, "svc4", withInstance, 
shouldNotClear)
+
+                       err = task.ClearNoInstanceServices(timeLimit)
+                       Expect(err).To(BeNil())
+
+                       checkServiceCleared(domain, project)
+               })
+       }
+}
+
+var _ = Describe("clear service", func() {
+       Describe("domain project 1", serviceClearCheckFunc("default1", 
"default"))
+       Describe("domain project 2", serviceClearCheckFunc("default2", 
"default"))
+})

Reply via email to