This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 1d56c95 SCB-317 Prepare the release for Service-Center-1.0.0-m1 (#269)
1d56c95 is described below
commit 1d56c950820ceae9863df9993415dd378265d1a5
Author: little-cui <[email protected]>
AuthorDate: Fri Feb 2 21:07:39 2018 +0800
SCB-317 Prepare the release for Service-Center-1.0.0-m1 (#269)
* Optimize code.
(cherry picked from commit b16e646)
* SCB-317 Fix UT failure
---
pkg/cache/cache.go | 331 -----------------------------
pkg/etcdsync/README.md | 4 +-
pkg/etcdsync/etcdsync_suite_test.go | 2 +-
pkg/etcdsync/mutex.go | 14 --
server/service/event/rule_event_handler.go | 8 +-
server/service/event/tag_event_handler.go | 8 +-
server/service/microservices.go | 7 -
server/service/util/dependency.go | 64 ------
server/service/util/dependency_test.go | 8 -
server/service/util/microservice_util.go | 25 +--
server/service/util/util_suite_test.go | 21 --
11 files changed, 8 insertions(+), 484 deletions(-)
diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go
deleted file mode 100644
index c54a4a5..0000000
--- a/pkg/cache/cache.go
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package cache
-
-import (
- "container/list"
- "errors"
- "fmt"
- "sync"
- "time"
-)
-
-// Cache is a goroutine-safe K/V cache.
-type Cache struct {
- sync.RWMutex
- items map[string]*Item
- defaultExpiration time.Duration
-}
-
-type Item struct {
- Object interface{}
- Expiration *time.Time
-}
-
-// Returns true if the item has expired.
-func (item *Item) Expired() bool {
- if item.Expiration == nil {
- return false
- }
- return item.Expiration.Before(time.Now())
-}
-
-// New create a new cache with a given default expiration duration and cleanup
-// interval. If the expiration duration is less than 1, the items in the cache
-// never expire (by default), and must be deleted manually. If the cleanup
-// interval is less than one, expired items are not deleted from the cache
-// before calling DeleteExpired.
-func New(defaultExpiration, cleanInterval time.Duration) *Cache {
- c := &Cache{
- items: map[string]*Item{},
- defaultExpiration: defaultExpiration,
- }
- if cleanInterval > 0 {
- go func() {
- for {
- time.Sleep(cleanInterval)
- c.DeleteExpired()
- }
- }()
- }
- return c
-}
-
-// Get return an item or nil, and a bool indicating whether
-// the key was found.
-func (c *Cache) Get(key string) (interface{}, bool) {
- c.RLock()
- item, ok := c.items[key]
- if !ok || item.Expired() {
- c.RUnlock()
- return nil, false
- }
- c.RUnlock()
- return item.Object, true
-}
-
-// Get all cache keys
-func (c *Cache) Keys() []string {
- c.RLock()
- defer c.RUnlock()
- keys := make([]string, 0, len(c.items))
- for k := range c.items {
- keys = append(keys, k)
- }
- return keys
-}
-
-// Set add a new key or replace an exist key. If the dur is 0, we will
-// use the defaultExpiration.
-func (c *Cache) Set(key string, val interface{}, dur time.Duration) {
- var t *time.Time
- c.Lock()
- if dur == 0 {
- dur = c.defaultExpiration
- }
- if dur > 0 {
- tmp := time.Now().Add(dur)
- t = &tmp
- }
- c.items[key] = &Item{
- Object: val,
- Expiration: t,
- }
- c.Unlock()
-}
-
-// Delete a key-value pair if the key is existed.
-func (c *Cache) Delete(key string) {
- c.Lock()
- delete(c.items, key)
- c.Unlock()
-}
-
-// Delete all cache.
-func (c *Cache) Flush() {
- c.Lock()
- c.items = map[string]*Item{}
- c.Unlock()
-}
-
-// Add a number to a key-value pair.
-func (c *Cache) Increment(key string, x int64) error {
- c.Lock()
- val, ok := c.items[key]
- if !ok || val.Expired() {
- c.Unlock()
- return fmt.Errorf("Item %s not found", key)
- }
- switch val.Object.(type) {
- case int:
- val.Object = val.Object.(int) + int(x)
- case int8:
- val.Object = val.Object.(int8) + int8(x)
- case int16:
- val.Object = val.Object.(int16) + int16(x)
- case int32:
- val.Object = val.Object.(int32) + int32(x)
- case int64:
- val.Object = val.Object.(int64) + x
- case uint:
- val.Object = val.Object.(uint) + uint(x)
- case uint8:
- val.Object = val.Object.(uint8) + uint8(x)
- case uint16:
- val.Object = val.Object.(uint16) + uint16(x)
- case uint32:
- val.Object = val.Object.(uint32) + uint32(x)
- case uint64:
- val.Object = val.Object.(uint64) + uint64(x)
- case uintptr:
- val.Object = val.Object.(uintptr) + uintptr(x)
- default:
- c.Unlock()
- return fmt.Errorf("The value type error")
- }
- c.Unlock()
- return nil
-}
-
-// Sub a number to a key-value pair.
-func (c *Cache) Decrement(key string, x int64) error {
- c.Lock()
- val, ok := c.items[key]
- if !ok || val.Expired() {
- c.Unlock()
- return fmt.Errorf("Item %s not found", key)
- }
- switch val.Object.(type) {
- case int:
- val.Object = val.Object.(int) - int(x)
- case int8:
- val.Object = val.Object.(int8) - int8(x)
- case int16:
- val.Object = val.Object.(int16) - int16(x)
- case int32:
- val.Object = val.Object.(int32) - int32(x)
- case int64:
- val.Object = val.Object.(int64) - x
- case uint:
- val.Object = val.Object.(uint) - uint(x)
- case uint8:
- val.Object = val.Object.(uint8) - uint8(x)
- case uint16:
- val.Object = val.Object.(uint16) - uint16(x)
- case uint32:
- val.Object = val.Object.(uint32) - uint32(x)
- case uint64:
- val.Object = val.Object.(uint64) - uint64(x)
- case uintptr:
- val.Object = val.Object.(uintptr) - uintptr(x)
- default:
- c.Unlock()
- return fmt.Errorf("The value type error")
- }
- c.Unlock()
- return nil
-}
-
-// Return the number of item in cache.
-func (c *Cache) ItemCount() int {
- c.RLock()
- counts := len(c.items)
- c.RUnlock()
- return counts
-}
-
-// Delete all expired items.
-func (c *Cache) DeleteExpired() {
- c.Lock()
- for k, v := range c.items {
- if v.Expired() {
- delete(c.items, k)
- }
- }
- c.Unlock()
-}
-
-// The LRUCache is a goroutine-safe cache.
-type LRUCache struct {
- sync.RWMutex
- maxEntries int
- items map[string]*list.Element
- cacheList *list.List
-}
-
-type entry struct {
- key string
- value interface{}
-}
-
-// NewLRU create a LRUCache with max size. The size is 0 means no limit.
-func NewLRU(size int) (*LRUCache, error) {
- if size < 0 {
- return nil, errors.New("The size of LRU Cache must no less than
0")
- }
- lru := &LRUCache{
- maxEntries: size,
- items: make(map[string]*list.Element, size),
- cacheList: list.New(),
- }
- return lru, nil
-}
-
-// Add a new key-value pair to the LRUCache.
-func (c *LRUCache) Add(key string, value interface{}) {
- c.Lock()
- defer c.Unlock()
- if ent, hit := c.items[key]; hit {
- c.cacheList.MoveToFront(ent)
- ent.Value.(*entry).value = value
- return
- }
- ent := &entry{
- key: key,
- value: value,
- }
- entry := c.cacheList.PushFront(ent)
- c.items[key] = entry
-
- if c.maxEntries > 0 && c.cacheList.Len() > c.maxEntries {
- c.removeOldestElement()
- }
-}
-
-// Get a value from the LRUCache. And a bool indicating
-// whether found or not.
-func (c *LRUCache) Get(key string) (interface{}, bool) {
- c.RLock()
- defer c.RUnlock()
-
- if ent, hit := c.items[key]; hit {
- c.cacheList.MoveToFront(ent)
- return ent.Value.(*entry).value, true
- }
- return nil, false
-}
-
-// Remove a key-value pair in LRUCache. If the key is not existed,
-// nothing will happen.
-func (c *LRUCache) Remove(key string) {
- c.Lock()
- defer c.Unlock()
-
- if ent, hit := c.items[key]; hit {
- c.removeElement(ent)
- }
-}
-
-// Return the number of key-value pair in LRUCache.
-func (c *LRUCache) Len() int {
- c.RLock()
- length := c.cacheList.Len()
- c.RUnlock()
- return length
-}
-
-// Delete all entry in the LRUCache. But the max size will hold.
-func (c *LRUCache) Clear() {
- c.Lock()
- c.cacheList = list.New()
- c.items = make(map[string]*list.Element, c.maxEntries)
- c.Unlock()
-}
-
-// Resize the max limit.
-func (c *LRUCache) SetMaxEntries(max int) error {
- if max < 0 {
- return errors.New("The max limit of entryies must no less than
0")
- }
- c.Lock()
- c.maxEntries = max
- c.Unlock()
- return nil
-}
-
-func (c *LRUCache) removeElement(e *list.Element) {
- c.cacheList.Remove(e)
- ent := e.Value.(*entry)
- delete(c.items, ent.key)
-}
-
-func (c *LRUCache) removeOldestElement() {
- ent := c.cacheList.Back()
- if ent != nil {
- c.removeElement(ent)
- }
-}
diff --git a/pkg/etcdsync/README.md b/pkg/etcdsync/README.md
index c31e3fd..06b68b4 100644
--- a/pkg/etcdsync/README.md
+++ b/pkg/etcdsync/README.md
@@ -2,8 +2,8 @@
## example
-```bash
-lock, _ := etcdsync.Lock("/test")
+```go
+lock, _ := etcdsync.Lock("/test", true)
defer lock.Unlock()
//do something
g += 1
diff --git a/pkg/etcdsync/etcdsync_suite_test.go
b/pkg/etcdsync/etcdsync_suite_test.go
index 345182d..5b40871 100644
--- a/pkg/etcdsync/etcdsync_suite_test.go
+++ b/pkg/etcdsync/etcdsync_suite_test.go
@@ -41,7 +41,7 @@ func BenchmarkLock(b *testing.B) {
var g = 0
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
- lock, _ := etcdsync.Lock("/test")
+ lock, _ := etcdsync.Lock("/test", true)
defer lock.Unlock()
//do something
g += 1
diff --git a/pkg/etcdsync/mutex.go b/pkg/etcdsync/mutex.go
index 96adcf8..3cb4bcf 100644
--- a/pkg/etcdsync/mutex.go
+++ b/pkg/etcdsync/mutex.go
@@ -35,7 +35,6 @@ const (
ROOT_PATH = "/cse/etcdsync"
)
-// A Mutex is a mutual exclusion lock which is distributed across a cluster.
type DLockFactory struct {
key string
ctx context.Context
@@ -69,9 +68,6 @@ func init() {
pid = os.Getpid()
}
-// New creates a Mutex with the given key which must be the same
-// across the cluster nodes.
-// machines are the ectd cluster addresses
func NewLockFactory(key string, ttl int64) *DLockFactory {
if len(key) == 0 {
return nil
@@ -88,10 +84,6 @@ func NewLockFactory(key string, ttl int64) *DLockFactory {
}
}
-// Lock locks m.
-// If the lock is already in use, the calling goroutine
-// blocks until the mutex is available. Flag wait is false,
-// this function is non-block when lock exist.
func (m *DLockFactory) NewDLock(wait bool) (l *DLock, err error) {
if !IsDebug {
m.mutex.Lock()
@@ -182,12 +174,6 @@ func (m *DLock) Lock(wait bool) error {
}
}
-// Unlock unlocks m.
-// It is a run-time error if m is not locked on entry to Unlock.
-//
-// A locked Mutex is not associated with a particular goroutine.
-// It is allowed for one goroutine to lock a Mutex and then
-// arrange for another goroutine to unlock it.
func (m *DLock) Unlock() (err error) {
opts := []registry.PluginOpOption{
registry.DEL,
diff --git a/server/service/event/rule_event_handler.go
b/server/service/event/rule_event_handler.go
index a2d2650..f85b62c 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -57,12 +57,8 @@ func (apt *RulesChangedAsyncTask) publish(ctx
context.Context, domainProject, pr
return err
}
if provider == nil {
- tmpProvider, found := serviceUtil.MsCache().Get(providerId)
- if !found {
- util.Logger().Errorf(nil, "provider %s does not exist",
providerId)
- return fmt.Errorf("provider %s does not exist",
providerId)
- }
- provider = tmpProvider.(*pb.MicroService)
+ util.Logger().Errorf(nil, "provider %s does not exist",
providerId)
+ return fmt.Errorf("provider %s does not exist", providerId)
}
consumerIds, err := serviceUtil.GetConsumersInCache(ctx, domainProject,
provider)
diff --git a/server/service/event/tag_event_handler.go
b/server/service/event/tag_event_handler.go
index 42ea494..cdf96d6 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -57,12 +57,8 @@ func (apt *TagsChangedAsyncTask) publish(ctx
context.Context, domainProject, con
return err
}
if consumer == nil {
- consumerTmp, found := serviceUtil.MsCache().Get(consumerId)
- if !found {
- util.Logger().Errorf(nil, "service not exist, %s",
consumerId)
- return fmt.Errorf("service not exist, %s", consumerId)
- }
- consumer = consumerTmp.(*pb.MicroService)
+ util.Logger().Errorf(nil, "service not exist, %s", consumerId)
+ return fmt.Errorf("service not exist, %s", consumerId)
}
providerIds, err := serviceUtil.GetProvidersInCache(ctx, domainProject,
consumer)
if err != nil {
diff --git a/server/service/microservices.go b/server/service/microservices.go
index 60463a8..8514333 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -254,13 +254,6 @@ func (s *MicroServiceService) DeleteServicePri(ctx
context.Context, serviceId st
}
}
- //refresh msCache consumerCache, ensure that watch can notify consumers
when no cache.
- err = serviceUtil.RefreshDependencyCache(ctx, domainProject, service)
- if err != nil {
- util.Logger().Errorf(err, "%s micro-service failed, serviceId
is %s: inner err, refresh service dependency cache failed.", title, serviceId)
- return pb.CreateResponse(scerr.ErrInternal, "Refresh dependency
cache failed."), err
- }
-
serviceKey := &pb.MicroServiceKey{
Tenant: domainProject,
Environment: service.Environment,
diff --git a/server/service/util/dependency.go
b/server/service/util/dependency.go
index 3e9e929..f78cd92 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -20,7 +20,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "github.com/apache/incubator-servicecomb-service-center/pkg/cache"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
apt "github.com/apache/incubator-servicecomb-service-center/server/core"
"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
@@ -30,25 +29,8 @@ import (
"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
"golang.org/x/net/context"
"strings"
- "time"
)
-var consumerCache *cache.Cache
-var providerCache *cache.Cache
-
-/*
-缓存2分钟过期
-1分钟周期缓存consumers 遍历所有serviceid并查询consumers 做缓存
-当发现新查询到的consumers列表变成0时则不做cache set操作
-这样当consumers关系完全被删除也有1分钟的时间窗让实例变化推送到相应的consumers里 1分鐘后緩存也會自動清理
-实例推送中的依赖发现实时性为T+1分钟
-*/
-func init() {
- d, _ := time.ParseDuration("2m")
- consumerCache = cache.New(d, d)
- providerCache = cache.New(d, d)
-}
-
func GetConsumersInCache(ctx context.Context, domainProject string, provider
*pb.MicroService) ([]string, error) {
// 查询所有consumer
dr := NewProviderDependencyRelation(ctx, domainProject, provider)
@@ -57,17 +39,6 @@ func GetConsumersInCache(ctx context.Context, domainProject
string, provider *pb
util.Logger().Errorf(err, "Get dependency consumerIds
failed.%s", provider.ServiceId)
return nil, err
}
-
- if len(consumerIds) == 0 {
- consumerIds, found := consumerCache.Get(provider.ServiceId)
- if found && len(consumerIds.([]string)) > 0 {
- return consumerIds.([]string), nil
- }
- util.Logger().Warnf(nil, "Can not find any consumer from local
cache and backend. provider is %s",
- provider.ServiceId)
- return nil, nil
- }
-
return consumerIds, nil
}
@@ -79,44 +50,9 @@ func GetProvidersInCache(ctx context.Context, domainProject
string, consumer *pb
util.Logger().Errorf(err, "Get dependency providerIds
failed.%s", consumer.ServiceId)
return nil, err
}
-
- if len(providerIds) == 0 {
- providerIds, found := providerCache.Get(consumer.ServiceId)
- if found && len(providerIds.([]string)) > 0 {
- return providerIds.([]string), nil
- }
- util.Logger().Warnf(nil, "Can not find any provider from local
cache and backend. consumer is %s",
- consumer.ServiceId)
- return nil, nil
- }
-
return providerIds, nil
}
-func RefreshDependencyCache(ctx context.Context, domainProject string, service
*pb.MicroService) error {
- dr := NewDependencyRelation(ctx, domainProject, service, service)
- consumerIds, err := dr.GetDependencyConsumerIds()
- if err != nil {
- util.Logger().Errorf(err, "%s,refresh dependency cache failed,
get consumerIds failed.", service.ServiceId)
- return err
- }
- providerIds, err := dr.GetDependencyProviderIds()
- if err != nil {
- util.Logger().Errorf(err, "%s,refresh dependency cache failed,
get providerIds failed.", service.ServiceId)
- return err
- }
- MsCache().Set(service.ServiceId, service, 5*time.Minute)
- if len(consumerIds) > 0 {
- util.Logger().Infof("refresh %s dependency cache: cached %d
consumerId(s) for 5min.", service.ServiceId, len(consumerIds))
- consumerCache.Set(service.ServiceId, consumerIds, 5*time.Minute)
- }
- if len(providerIds) > 0 {
- util.Logger().Infof("refresh %s dependency cache: cached %d
providerId(s) for 5min.", service.ServiceId, len(providerIds))
- providerCache.Set(service.ServiceId, providerIds, 5*time.Minute)
- }
- return nil
-}
-
func GetConsumerIdsByProvider(ctx context.Context, domainProject string,
provider *pb.MicroService) (allow []string, deny []string, _ error) {
if provider == nil || len(provider.ServiceId) == 0 {
return nil, nil, fmt.Errorf("invalid provider")
diff --git a/server/service/util/dependency_test.go
b/server/service/util/dependency_test.go
index 546459e..52104c7 100644
--- a/server/service/util/dependency_test.go
+++ b/server/service/util/dependency_test.go
@@ -24,14 +24,6 @@ import (
"testing"
)
-func TestRefreshDependencyCache(t *testing.T) {
- err := RefreshDependencyCache(context.Background(), "",
&proto.MicroService{})
- if err == nil {
- fmt.Printf(`RefreshDependencyCache failed`)
- t.FailNow()
- }
-}
-
func TestDeleteDependencyForService(t *testing.T) {
_, err := DeleteDependencyForDeleteService("", "",
&proto.MicroServiceKey{})
if err != nil {
diff --git a/server/service/util/microservice_util.go
b/server/service/util/microservice_util.go
index a759dff..c740668 100644
--- a/server/service/util/microservice_util.go
+++ b/server/service/util/microservice_util.go
@@ -18,7 +18,6 @@ package util
import (
"encoding/json"
- "github.com/apache/incubator-servicecomb-service-center/pkg/cache"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
apt "github.com/apache/incubator-servicecomb-service-center/server/core"
"github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
@@ -28,20 +27,8 @@ import (
"github.com/apache/incubator-servicecomb-service-center/server/plugin"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
- "time"
)
-var msCache *cache.Cache
-
-func MsCache() *cache.Cache {
- return msCache
-}
-
-func init() {
- d, _ := time.ParseDuration("1m")
- msCache = cache.New(d, d)
-}
-
/*
get Service by service id
*/
@@ -65,17 +52,7 @@ func GetServiceWithRev(ctx context.Context, domain string,
id string, rev int64)
}
func GetServiceInCache(ctx context.Context, domain string, id string)
(*pb.MicroService, error) {
- ms, ok := msCache.Get(id)
- if !ok {
- ms, err := GetService(ctx, domain, id)
- if ms == nil {
- return nil, err
- }
- msCache.Set(id, ms, 0)
- return ms, nil
- }
-
- return ms.(*pb.MicroService), nil
+ return GetService(ctx, domain, id)
}
func GetService(ctx context.Context, domainProject string, serviceId string)
(*pb.MicroService, error) {
diff --git a/server/service/util/util_suite_test.go
b/server/service/util/util_suite_test.go
index 05de94d..7cbb461 100644
--- a/server/service/util/util_suite_test.go
+++ b/server/service/util/util_suite_test.go
@@ -116,27 +116,6 @@ func TestGetService(t *testing.T) {
}
}
-func TestMsCache(t *testing.T) {
- defer func() {
- if r := recover(); r != nil {
- t.FailNow()
- }
- }()
- _, err := serviceUtil.GetServiceInCache(context.Background(), "", "")
- if err == nil {
- t.FailNow()
- }
- ms := serviceUtil.MsCache()
- if ms == nil {
- t.FailNow()
- }
- ms.Set("", &proto.MicroService{}, 0)
- _, err = serviceUtil.GetServiceInCache(context.Background(), "", "")
- if err != nil {
- t.FailNow()
- }
-}
-
func TestFromContext(t *testing.T) {
ctx := context.WithValue(context.Background(), "noCache", "1")
opts := serviceUtil.FromContext(ctx)
--
To stop receiving notification emails like this one, please contact
[email protected].