This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new c25f75f Mongo初步缓存实现,事件通知实现,用于同步数据给synce (#789)
c25f75f is described below
commit c25f75fe584498ffc285523ab8dc2214c0dffdb5
Author: fuziye01 <[email protected]>
AuthorDate: Tue Dec 29 08:54:20 2020 +0800
Mongo初步缓存实现,事件通知实现,用于同步数据给synce (#789)
1.缓存用两个map分别存储了, 业务Id -> document documentId -> 业务Id的缓存
2.暂时只实现了service和Instance的缓存,用于同步数据给syncer
3.通过list、watch流程初始化、更新缓存
4.watch到的数据生成事件,并通知event_handler处理,用于同步数据给syncer
5.重构etcd的list\watch流程,与mongodb实现统一的接口
6.UT 修改
---
datasource/etcd/sd/etcd/cacher_kv.go | 135 +++---
datasource/etcd/sd/etcd/cacher_kv_test.go | 132 +++---
datasource/etcd/sd/etcd/common.go | 25 +-
datasource/etcd/sd/etcd/listwatch_inner.go | 45 +-
datasource/etcd/sd/etcd/listwatch_test.go | 21 +-
datasource/etcd/sd/etcd/watcher_test.go | 87 ----
datasource/mongo/mongo.go | 12 +
datasource/mongo/sd/cache.go | 46 ++
datasource/mongo/sd/common.go | 25 ++
datasource/mongo/sd/event_proxy.go | 63 +++
datasource/mongo/sd/event_proxy_test.go | 81 ++++
datasource/mongo/sd/listwatch_inner.go | 170 ++++++++
datasource/mongo/sd/listwatch_test.go | 87 ++++
datasource/mongo/sd/mongo_cache.go | 137 ++++++
datasource/mongo/sd/mongo_cacher.go | 461 +++++++++++++++++++++
datasource/mongo/sd/mongo_cacher_test.go | 315 ++++++++++++++
.../sd/etcd/watcher.go => mongo/sd/options.go} | 39 +-
datasource/mongo/sd/options_test.go | 42 ++
datasource/mongo/sd/types.go | 102 +++++
datasource/mongo/sd/typestore.go | 135 ++++++
datasource/mongo/sd/typestore_test.go | 42 ++
datasource/{etcd/sd/etcd => sdcommon}/common.go | 36 +-
.../etcd/watcher_inner.go => sdcommon/eventbus.go} | 33 +-
datasource/{etcd/sd/etcd => sdcommon}/listwatch.go | 13 +-
datasource/sdcommon/types.go | 69 +++
25 files changed, 2068 insertions(+), 285 deletions(-)
diff --git a/datasource/etcd/sd/etcd/cacher_kv.go
b/datasource/etcd/sd/etcd/cacher_kv.go
index a83ad15..9a7fa51 100644
--- a/datasource/etcd/sd/etcd/cacher_kv.go
+++ b/datasource/etcd/sd/etcd/cacher_kv.go
@@ -21,17 +21,18 @@ import (
"context"
"errors"
"fmt"
+ "reflect"
"sync"
"time"
"github.com/apache/servicecomb-service-center/datasource/etcd/client"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
"github.com/apache/servicecomb-service-center/pkg/backoff"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
- "github.com/coreos/etcd/mvcc/mvccpb"
rmodel "github.com/go-chassis/cari/discovery"
)
@@ -47,7 +48,7 @@ type KvCacher struct {
reListCount int
ready chan struct{}
- lw ListWatch
+ lw sdcommon.ListWatch
mux sync.Mutex
once sync.Once
cache sd.Cache
@@ -59,42 +60,42 @@ func (c *KvCacher) Config() *sd.Config {
}
func (c *KvCacher) needList() bool {
- rev := c.lw.Revision()
+ rev := c.getRevision()
if rev == 0 {
c.reListCount = 0
return true
}
c.reListCount++
- if c.reListCount < DefaultForceListInterval {
+ if c.reListCount < sdcommon.DefaultForceListInterval {
return false
}
c.reListCount = 0
return true
}
-func (c *KvCacher) doList(cfg ListWatchConfig) error {
+func (c *KvCacher) doList(cfg sdcommon.ListWatchConfig) error {
resp, err := c.lw.List(cfg)
if err != nil {
return err
}
- rev := c.lw.Revision()
- kvs := resp.Kvs
+ rev := c.getRevision()
+ resources := resp.Resources
start := time.Now()
defer log.DebugOrWarnf(start, "finish to cache key %s, %d items, rev:
%d",
- c.Cfg.Key, len(kvs), rev)
+ c.Cfg.Key, len(resources), rev)
// just reset the cacher if cache marked dirty
if c.cache.Dirty() {
- c.reset(rev, kvs)
+ c.reset(rev, resources)
log.Warnf("Cache[%s] is reset!", c.cache.Name())
return nil
}
// calc and return the diff between cache and ETCD
- evts := c.filter(rev, kvs)
+ evts := c.filter(rev, resources)
// there is no change between List() and cache, then stop the self
preservation
- if ec, kc := len(evts), len(kvs); c.Cfg.DeferHandler != nil && ec == 0
&& kc != 0 &&
+ if ec, kc := len(evts), len(resources); c.Cfg.DeferHandler != nil && ec
== 0 && kc != 0 &&
c.Cfg.DeferHandler.Reset() {
log.Warnf("most of the protected data(%d/%d) are recovered",
kc, c.cache.GetAll(nil))
@@ -105,7 +106,7 @@ func (c *KvCacher) doList(cfg ListWatchConfig) error {
return nil
}
-func (c *KvCacher) reset(rev int64, kvs []*mvccpb.KeyValue) {
+func (c *KvCacher) reset(rev int64, kvs []*sdcommon.Resource) {
if c.Cfg.DeferHandler != nil {
c.Cfg.DeferHandler.Reset()
}
@@ -117,9 +118,9 @@ func (c *KvCacher) reset(rev int64, kvs []*mvccpb.KeyValue)
{
c.buildCache(c.filter(rev, kvs))
}
-func (c *KvCacher) doWatch(cfg ListWatchConfig) error {
- if watcher := c.lw.Watch(cfg); watcher != nil {
- return c.handleWatcher(watcher)
+func (c *KvCacher) doWatch(cfg sdcommon.ListWatchConfig) error {
+ if eventBus := c.lw.EventBus(cfg); eventBus != nil {
+ return c.handleEventBus(eventBus)
}
return fmt.Errorf("handle a nil watcher")
}
@@ -129,7 +130,7 @@ func (c *KvCacher) ListAndWatch(ctx context.Context) error {
defer c.mux.Unlock()
defer log.Recover() // ensure ListAndWatch never raise panic
- cfg := ListWatchConfig{
+ cfg := sdcommon.ListWatchConfig{
Timeout: c.Cfg.Timeout,
Context: ctx,
}
@@ -139,7 +140,7 @@ func (c *KvCacher) ListAndWatch(ctx context.Context) error {
// 2. Runtime: error occurs in previous watch operation, the lister's
revision is set to 0.
// 3. Runtime: watch operation timed out over
DEFAULT_FORCE_LIST_INTERVAL times.
if c.needList() {
- if err := c.doList(cfg); err != nil && (!c.IsReady() ||
c.lw.Revision() == 0) {
+ if err := c.doList(cfg); err != nil && (!c.IsReady() ||
c.getRevision() == 0) {
return err // do retry to list etcd
}
// keep going to next step:
@@ -153,37 +154,37 @@ func (c *KvCacher) ListAndWatch(ctx context.Context)
error {
return c.doWatch(cfg)
}
-func (c *KvCacher) handleWatcher(watcher Watcher) error {
- defer watcher.Stop()
- for resp := range watcher.EventBus() {
+func (c *KvCacher) handleEventBus(eventBus *sdcommon.EventBus) error {
+ defer eventBus.Stop()
+ for resp := range eventBus.ResourceEventBus() {
if resp == nil {
return errors.New("handle watcher error")
}
start := time.Now()
rev := resp.Revision
- evts := make([]sd.KvEvent, 0, len(resp.Kvs))
- for _, kv := range resp.Kvs {
- evt := sd.NewKvEvent(rmodel.EVT_CREATE, nil,
kv.ModRevision)
+ evts := make([]sd.KvEvent, 0, len(resp.Resources))
+ for _, resource := range resp.Resources {
+ evt := sd.NewKvEvent(rmodel.EVT_CREATE, nil,
resource.ModRevision)
switch {
- case resp.Action == client.ActionPut && kv.Version == 1:
- evt.Type, evt.KV = rmodel.EVT_CREATE,
c.doParse(kv)
- case resp.Action == client.ActionPut:
- evt.Type, evt.KV = rmodel.EVT_UPDATE,
c.doParse(kv)
- case resp.Action == client.ActionDelete:
+ case resp.Action == sdcommon.ActionPUT &&
resource.Version == 1:
+ evt.Type, evt.KV = rmodel.EVT_CREATE,
c.doParse(resource)
+ case resp.Action == sdcommon.ActionPUT:
+ evt.Type, evt.KV = rmodel.EVT_UPDATE,
c.doParse(resource)
+ case resp.Action == sdcommon.ActionDelete:
evt.Type = rmodel.EVT_DELETE
- if kv.Value == nil {
+ if resource.Value == nil {
// it will happen in embed mode, and
then need to get the cache value not unmarshal
- evt.KV =
c.cache.Get(util.BytesToStringWithNoCopy(kv.Key))
+ evt.KV = c.cache.Get(resource.Key)
} else {
- evt.KV = c.doParse(kv)
+ evt.KV = c.doParse(resource)
}
default:
- log.Errorf(nil, "unknown KeyValue %v", kv)
+ log.Errorf(nil, "unknown KeyValue %v", resource)
continue
}
if evt.KV == nil {
- log.Errorf(nil, "failed to parse KeyValue %v",
kv)
+ log.Errorf(nil, "failed to parse KeyValue %v",
resource)
continue
}
evts = append(evts, evt)
@@ -207,10 +208,10 @@ func (c *KvCacher) refresh(ctx context.Context) {
log.Debugf("start to list and watch %s", c.Cfg)
retries := 0
- timer := time.NewTimer(minWaitInterval)
+ timer := time.NewTimer(sdcommon.MinWaitInterval)
defer timer.Stop()
for {
- nextPeriod := minWaitInterval
+ nextPeriod := sdcommon.MinWaitInterval
if err := c.ListAndWatch(ctx); err != nil {
retries++
nextPeriod = backoff.GetBackoff().Delay(retries)
@@ -241,14 +242,14 @@ func (c *KvCacher) sync(evts []sd.KvEvent) {
c.onEvents(evts)
}
-func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []sd.KvEvent {
+func (c *KvCacher) filter(rev int64, items []*sdcommon.Resource) []sd.KvEvent {
nc := len(items)
- newStore := make(map[string]*mvccpb.KeyValue, nc)
+ newStore := make(map[string]*sdcommon.Resource, nc)
for _, kv := range items {
- newStore[util.BytesToStringWithNoCopy(kv.Key)] = kv
+ newStore[kv.Key] = kv
}
filterStopCh := make(chan struct{})
- eventsCh := make(chan [eventBlockSize]sd.KvEvent, 2)
+ eventsCh := make(chan [sdcommon.EventBlockSize]sd.KvEvent, 2)
go c.filterDelete(newStore, rev, eventsCh, filterStopCh)
@@ -266,9 +267,9 @@ func (c *KvCacher) filter(rev int64, items
[]*mvccpb.KeyValue) []sd.KvEvent {
return evts
}
-func (c *KvCacher) filterDelete(newStore map[string]*mvccpb.KeyValue,
- rev int64, eventsCh chan [eventBlockSize]sd.KvEvent, filterStopCh chan
struct{}) {
- var block [eventBlockSize]sd.KvEvent
+func (c *KvCacher) filterDelete(newStore map[string]*sdcommon.Resource,
+ rev int64, eventsCh chan [sdcommon.EventBlockSize]sd.KvEvent,
filterStopCh chan struct{}) {
+ var block [sdcommon.EventBlockSize]sd.KvEvent
i := 0
c.cache.ForEach(func(k string, v *sd.KeyValue) (next bool) {
@@ -279,9 +280,9 @@ func (c *KvCacher) filterDelete(newStore
map[string]*mvccpb.KeyValue,
return
}
- if i >= eventBlockSize {
+ if i >= sdcommon.EventBlockSize {
eventsCh <- block
- block = [eventBlockSize]sd.KvEvent{}
+ block = [sdcommon.EventBlockSize]sd.KvEvent{}
i = 0
}
@@ -297,17 +298,17 @@ func (c *KvCacher) filterDelete(newStore
map[string]*mvccpb.KeyValue,
close(filterStopCh)
}
-func (c *KvCacher) filterCreateOrUpdate(newStore map[string]*mvccpb.KeyValue,
- rev int64, eventsCh chan [eventBlockSize]sd.KvEvent, filterStopCh chan
struct{}) {
- var block [eventBlockSize]sd.KvEvent
+func (c *KvCacher) filterCreateOrUpdate(newStore map[string]*sdcommon.Resource,
+ rev int64, eventsCh chan [sdcommon.EventBlockSize]sd.KvEvent,
filterStopCh chan struct{}) {
+ var block [sdcommon.EventBlockSize]sd.KvEvent
i := 0
for k, v := range newStore {
ov := c.cache.Get(k)
if ov == nil {
- if i >= eventBlockSize {
+ if i >= sdcommon.EventBlockSize {
eventsCh <- block
- block = [eventBlockSize]sd.KvEvent{}
+ block = [sdcommon.EventBlockSize]sd.KvEvent{}
i = 0
}
@@ -322,9 +323,9 @@ func (c *KvCacher) filterCreateOrUpdate(newStore
map[string]*mvccpb.KeyValue,
continue
}
- if i >= eventBlockSize {
+ if i >= sdcommon.EventBlockSize {
eventsCh <- block
- block = [eventBlockSize]sd.KvEvent{}
+ block = [sdcommon.EventBlockSize]sd.KvEvent{}
i = 0
}
@@ -360,7 +361,7 @@ func (c *KvCacher) deferHandle(ctx context.Context) {
func (c *KvCacher) handleDeferEvents(ctx context.Context) {
defer log.Recover()
var (
- evts = make([]sd.KvEvent, eventBlockSize)
+ evts = make([]sd.KvEvent, sdcommon.EventBlockSize)
i int
)
interval := 300 * time.Millisecond
@@ -375,9 +376,9 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
return
}
- if i >= eventBlockSize {
+ if i >= sdcommon.EventBlockSize {
c.onEvents(evts[:i])
- evts = make([]sd.KvEvent, eventBlockSize)
+ evts = make([]sd.KvEvent,
sdcommon.EventBlockSize)
i = 0
}
@@ -393,7 +394,7 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
}
c.onEvents(evts[:i])
- evts = make([]sd.KvEvent, eventBlockSize)
+ evts = make([]sd.KvEvent, sdcommon.EventBlockSize)
i = 0
}
}
@@ -453,10 +454,10 @@ func (c *KvCacher) notify(evts []sd.KvEvent) {
}
}
-func (c *KvCacher) doParse(src *mvccpb.KeyValue) (kv *sd.KeyValue) {
+func (c *KvCacher) doParse(src *sdcommon.Resource) (kv *sd.KeyValue) {
kv = sd.NewKeyValue()
- if err := FromEtcdKeyValue(kv, src, c.Cfg.Parser); err != nil {
- log.Errorf(err, "parse %s value failed",
util.BytesToStringWithNoCopy(src.Key))
+ if err := ParseResourceToEtcdKeyValue(kv, src, c.Cfg.Parser); err !=
nil {
+ log.Errorf(err, "parse %s value failed", src.Key)
return nil
}
return
@@ -497,7 +498,7 @@ func (c *KvCacher) reportMetrics(ctx context.Context) {
if !config.GetServer().EnablePProf {
return
}
- timer := time.NewTimer(DefaultMetricsInterval)
+ timer := time.NewTimer(sdcommon.DefaultMetricsInterval)
defer timer.Stop()
for {
select {
@@ -505,7 +506,7 @@ func (c *KvCacher) reportMetrics(ctx context.Context) {
return
case <-timer.C:
ReportCacheSize(c.cache.Name(), "raw", c.cache.Size())
- timer.Reset(DefaultMetricsInterval)
+ timer.Reset(sdcommon.DefaultMetricsInterval)
}
}
}
@@ -522,3 +523,17 @@ func NewKvCacher(cfg *sd.Config, cache sd.Cache) *KvCacher
{
goroutine: gopool.New(context.Background()),
}
}
+
+func (c *KvCacher) getRevision() int64 {
+ lw := reflect.ValueOf(c.lw)
+ revMethod := lw.MethodByName(revMethodName)
+
+ if !revMethod.IsValid() {
+ log.Warn("get revision failed")
+ return 0
+ }
+
+ revs := revMethod.Call(make([]reflect.Value, 0))
+ rev := revs[0].Int()
+ return rev
+}
diff --git a/datasource/etcd/sd/etcd/cacher_kv_test.go
b/datasource/etcd/sd/etcd/cacher_kv_test.go
index 2604435..c715aed 100644
--- a/datasource/etcd/sd/etcd/cacher_kv_test.go
+++ b/datasource/etcd/sd/etcd/cacher_kv_test.go
@@ -25,12 +25,11 @@ import (
"strconv"
"testing"
- "github.com/apache/servicecomb-service-center/datasource/etcd/client"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
"github.com/apache/servicecomb-service-center/datasource/etcd/value"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/coreos/etcd/mvcc/mvccpb"
pb "github.com/go-chassis/cari/discovery"
)
@@ -74,12 +73,42 @@ func (n *mockCache) MarkDirty() {}
func (n *mockCache) Dirty() bool { return false }
func (n *mockCache) Clear() {}
-func TestNewKvCacher(t *testing.T) {
- w := &mockWatcher{}
- lw := &mockListWatch{
- Bus: make(chan *client.PluginResponse, 100),
+type mockListWatch struct {
+ ListResponse *sdcommon.ListWatchResp
+ WatchResponse *sdcommon.ListWatchResp
+ Rev int64
+}
+
+func (lw *mockListWatch) List(sdcommon.ListWatchConfig)
(*sdcommon.ListWatchResp, error) {
+ if lw.ListResponse == nil {
+ return nil, fmt.Errorf("list error")
+ }
+ lw.Rev = lw.ListResponse.Revision
+ return lw.ListResponse, nil
+}
+
+func (lw *mockListWatch) DoWatch(ctx context.Context, f
func(*sdcommon.ListWatchResp)) error {
+ if lw.WatchResponse == nil {
+ return fmt.Errorf("error")
+ }
+ if len(lw.WatchResponse.Resources) > 0 {
+ lw.Rev = lw.WatchResponse.Resources[0].ModRevision
}
- w.lw = lw
+ f(lw.WatchResponse)
+ <-ctx.Done()
+ return nil
+}
+
+func (lw *mockListWatch) EventBus(op sdcommon.ListWatchConfig)
*sdcommon.EventBus {
+ return sdcommon.NewEventBus(lw, op)
+}
+
+func (lw *mockListWatch) Revision() int64 {
+ return lw.Rev
+}
+
+func TestNewKvCacher(t *testing.T) {
+ lw := &mockListWatch{}
cr := &KvCacher{
Cfg: sd.Configure(),
@@ -111,17 +140,17 @@ func TestNewKvCacher(t *testing.T) {
cache: &mockCache{},
}
- lw.Watcher = w
- data := &mvccpb.KeyValue{Key: []byte("ka"), Value: []byte("va"),
Version: 1, ModRevision: 2}
- test := &client.PluginResponse{
- Action: client.ActionPut,
- Revision: 3,
- Kvs: []*mvccpb.KeyValue{data}}
+ data := &sdcommon.Resource{Key: "ka", Value: []byte("va"), Version: 1,
ModRevision: 2}
+ test := &sdcommon.ListWatchResp{
+ Action: sdcommon.ActionPUT,
+ Revision: 3,
+ Resources: []*sdcommon.Resource{data},
+ }
// case: list 1 resp and watch 0 event
cr.cache.Remove("ka")
lw.ListResponse = test
- lw.Bus <- nil
+ lw.WatchResponse = nil
cr.refresh(ctx)
// check ready
@@ -142,7 +171,7 @@ func TestNewKvCacher(t *testing.T) {
data.ModRevision = 3
// case: re-list and should be no event
- lw.Bus <- nil
+ lw.WatchResponse = nil
evt.KV = nil
cr.refresh(ctx)
if evt.KV != nil {
@@ -155,11 +184,11 @@ func TestNewKvCacher(t *testing.T) {
}
// case re-list and over no event times
- for i := 0; i < DefaultForceListInterval; i++ {
- lw.Bus <- nil
+ for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+ lw.WatchResponse = nil
}
evt.KV = nil
- for i := 0; i < DefaultForceListInterval; i++ {
+ for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
cr.refresh(ctx)
}
// check event
@@ -172,12 +201,12 @@ func TestNewKvCacher(t *testing.T) {
t.Fatalf("TestNewKvCacher failed")
}
- lw.ListResponse = &client.PluginResponse{Revision: 5}
- for i := 0; i < DefaultForceListInterval; i++ {
- lw.Bus <- nil
+ lw.ListResponse = &sdcommon.ListWatchResp{Revision: 5}
+ for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+ lw.WatchResponse = nil
}
evt.KV = nil
- for i := 0; i < DefaultForceListInterval; i++ {
+ for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
cr.refresh(ctx)
}
// check event
@@ -193,8 +222,8 @@ func TestNewKvCacher(t *testing.T) {
// case: no list and watch 1 event
test.Revision = 6
data.ModRevision = 5
- lw.Bus <- test
- lw.Bus <- nil
+ lw.WatchResponse = test
+ lw.ListResponse = nil
cr.refresh(ctx)
// check event
@@ -210,8 +239,8 @@ func TestNewKvCacher(t *testing.T) {
test.Revision = 7
data.Version = 2
data.ModRevision = 6
- lw.Bus <- test
- lw.Bus <- nil
+ lw.WatchResponse = test
+ lw.ListResponse = nil
cr.refresh(ctx)
// check event
@@ -225,11 +254,11 @@ func TestNewKvCacher(t *testing.T) {
}
test.Revision = 8
- test.Action = client.ActionDelete
+ test.Action = sdcommon.ActionDelete
data.Version = 0
data.ModRevision = 6
- lw.Bus <- test
- lw.Bus <- nil
+ lw.WatchResponse = test
+ lw.ListResponse = nil
cr.refresh(ctx)
// check event
@@ -248,7 +277,7 @@ func TestNewKvCacher(t *testing.T) {
data.Version = 1
data.ModRevision = 1
lw.ListResponse = test
- lw.Bus <- nil
+ lw.WatchResponse = nil
evt.KV = nil
cr.refresh(ctx)
// check event
@@ -263,12 +292,12 @@ func TestNewKvCacher(t *testing.T) {
// case: caught delete event but value is nil
test.Revision = 10
- test.Action = client.ActionDelete
+ test.Action = sdcommon.ActionDelete
data.Version = 0
data.ModRevision = 1
data.Value = nil
- lw.Bus <- test
- lw.Bus <- nil
+ lw.WatchResponse = test
+ lw.ListResponse = nil
cr.refresh(ctx)
data.Value = []byte("va")
@@ -288,7 +317,7 @@ func TestNewKvCacher(t *testing.T) {
data.Version = 1
data.ModRevision = 1
lw.ListResponse = test
- lw.Bus <- nil
+ lw.WatchResponse = nil
evt.KV = nil
old := *cr.Cfg
cr.Cfg.WithParser(value.MapParser)
@@ -304,8 +333,8 @@ func TestNewKvCacher(t *testing.T) {
}
lw.ListResponse = test
- lw.Bus <- test
- lw.Bus <- nil
+ lw.WatchResponse = test
+
cr.refresh(ctx)
*cr.Cfg = old
// check event
@@ -322,19 +351,19 @@ func TestNewKvCacher(t *testing.T) {
var evts = make(map[string]sd.KvEvent)
lw.Rev = 0
test.Revision = 3
- test.Kvs = nil
- for i := 0; i < eventBlockSize+1; i++ {
+ test.Resources = nil
+ for i := 0; i < sdcommon.EventBlockSize+1; i++ {
kv := *data
- kv.Key = []byte(strconv.Itoa(i))
+ kv.Key = strconv.Itoa(i)
kv.Value = []byte(strconv.Itoa(i))
kv.Version = int64(i)
kv.ModRevision = int64(i)
- test.Kvs = append(test.Kvs, &kv)
+ test.Resources = append(test.Resources, &kv)
}
data.ModRevision = 2
- test.Kvs = append(test.Kvs, data)
+ test.Resources = append(test.Resources, data)
lw.ListResponse = test
- lw.Bus <- nil
+ lw.WatchResponse = nil
evt.KV = nil
old = *cr.Cfg
cr.Cfg.WithEventFunc(func(evt sd.KvEvent) {
@@ -343,18 +372,18 @@ func TestNewKvCacher(t *testing.T) {
cr.refresh(ctx)
*cr.Cfg = old
// check all events
- for i := 0; i < eventBlockSize+1; i++ {
+ for i := 0; i < sdcommon.EventBlockSize+1; i++ {
s := strconv.Itoa(i)
if evt, ok := evts[s]; !ok || evt.Type != pb.EVT_CREATE ||
evt.KV.ModRevision != int64(i) || string(evt.KV.Value.([]byte)) != s {
t.Fatalf("TestNewKvCacher failed, %v", evt)
}
delete(evts, s)
}
- evt = evts[string(data.Key)]
+ evt = evts[data.Key]
if len(evts) != 1 || evt.Type != pb.EVT_CREATE || evt.Revision != 3 ||
evt.KV.ModRevision != 2 {
t.Fatalf("TestNewKvCacher failed, %v %v", evts, evt)
}
- delete(evts, string(data.Key))
+ delete(evts, data.Key)
// case: cacher is ready and the next list failed, prevent to watch
with rev = 0
if !cr.IsReady() {
@@ -362,14 +391,13 @@ func TestNewKvCacher(t *testing.T) {
}
lw.Rev = 0 // watch failed
lw.ListResponse = nil // the next list
- lw.Bus <- test
+ lw.WatchResponse = test
old = *cr.Cfg
cr.Cfg.WithEventFunc(func(evt sd.KvEvent) {
t.Fatalf("TestNewKvCacher failed, %v", evt)
})
cr.refresh(ctx)
*cr.Cfg = old
- <-lw.Bus
}
func BenchmarkFilter(b *testing.B) {
@@ -385,13 +413,13 @@ func BenchmarkFilter(b *testing.B) {
n := 300 * 1000 // 30w
cache := sd.NewKvCache("test", cfg)
- items := make([]*mvccpb.KeyValue, 0, n)
+ items := make([]*sdcommon.Resource, 0, n)
for ; n > 0; n-- {
k := fmt.Sprintf("/%d", n)
if n <= 10*1000 {
// create
- items = append(items, &mvccpb.KeyValue{
- Key: util.StringToBytesWithNoCopy(k),
+ items = append(items, &sdcommon.Resource{
+ Key: k,
Value: v,
ModRevision: int64(rand.Int()),
})
@@ -402,8 +430,8 @@ func BenchmarkFilter(b *testing.B) {
Value: inst,
ModRevision: 1,
})
- items = append(items, &mvccpb.KeyValue{
- Key: util.StringToBytesWithNoCopy(k),
+ items = append(items, &sdcommon.Resource{
+ Key: k,
Value: v,
ModRevision: int64(rand.Int()),
})
diff --git a/datasource/etcd/sd/etcd/common.go
b/datasource/etcd/sd/etcd/common.go
index 1d9d64e..c603275 100644
--- a/datasource/etcd/sd/etcd/common.go
+++ b/datasource/etcd/sd/etcd/common.go
@@ -16,22 +16,16 @@
package etcd
import (
- "time"
-
"github.com/apache/servicecomb-service-center/datasource/etcd"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
"github.com/apache/servicecomb-service-center/datasource/etcd/value"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+ "github.com/apache/servicecomb-service-center/pkg/util"
"github.com/coreos/etcd/mvcc/mvccpb"
)
const (
- // force re-list
- DefaultForceListInterval = 4
- DefaultMetricsInterval = 30 * time.Second
-
- minWaitInterval = 1 * time.Second
- eventBlockSize = 1000
- eventBusSize = 1000
+ revMethodName = "Revision"
)
var closedCh = make(chan struct{})
@@ -52,3 +46,16 @@ func FromEtcdKeyValue(dist *sd.KeyValue, src
*mvccpb.KeyValue, parser value.Pars
dist.Value, err = parser.Unmarshal(src.Value)
return
}
+
+func ParseResourceToEtcdKeyValue(dist *sd.KeyValue, src *sdcommon.Resource,
parser value.Parser) (err error) {
+ dist.ClusterName = etcd.Configuration().ClusterName
+ dist.Key = util.StringToBytesWithNoCopy(src.Key)
+ dist.Version = src.Version
+ dist.CreateRevision = src.CreateRevision
+ dist.ModRevision = src.ModRevision
+ if parser == nil {
+ return
+ }
+ dist.Value, err = parser.Unmarshal(src.Value.([]byte))
+ return
+}
diff --git a/datasource/etcd/sd/etcd/listwatch_inner.go
b/datasource/etcd/sd/etcd/listwatch_inner.go
index cb477bf..e2ecdc5 100644
--- a/datasource/etcd/sd/etcd/listwatch_inner.go
+++ b/datasource/etcd/sd/etcd/listwatch_inner.go
@@ -22,7 +22,9 @@ import (
"fmt"
"github.com/apache/servicecomb-service-center/datasource/etcd/client"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
"github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
)
type innerListWatch struct {
@@ -32,7 +34,7 @@ type innerListWatch struct {
rev int64
}
-func (lw *innerListWatch) List(op ListWatchConfig) (*client.PluginResponse,
error) {
+func (lw *innerListWatch) List(op sdcommon.ListWatchConfig)
(*sdcommon.ListWatchResp, error) {
otCtx, cancel := context.WithTimeout(op.Context, op.Timeout)
defer cancel()
resp, err := lw.Client.Do(otCtx,
client.WatchPrefixOpOptions(lw.Prefix)...)
@@ -41,7 +43,10 @@ func (lw *innerListWatch) List(op ListWatchConfig)
(*client.PluginResponse, erro
return nil, err
}
lw.setRevision(resp.Revision)
- return resp, nil
+
+ lwRsp := lw.doParsePluginRspToLwRsp(resp)
+
+ return lwRsp, nil
}
func (lw *innerListWatch) Revision() int64 {
@@ -52,11 +57,11 @@ func (lw *innerListWatch) setRevision(rev int64) {
lw.rev = rev
}
-func (lw *innerListWatch) Watch(op ListWatchConfig) Watcher {
- return newInnerWatcher(lw, op)
+func (lw *innerListWatch) EventBus(op sdcommon.ListWatchConfig)
*sdcommon.EventBus {
+ return sdcommon.NewEventBus(lw, op)
}
-func (lw *innerListWatch) DoWatch(ctx context.Context, f
func(*client.PluginResponse)) error {
+func (lw *innerListWatch) DoWatch(ctx context.Context, f
func(*sdcommon.ListWatchResp)) error {
rev := lw.Revision()
opts := append(
client.WatchPrefixOpOptions(lw.Prefix),
@@ -69,7 +74,17 @@ func (lw *innerListWatch) DoWatch(ctx context.Context, f
func(*client.PluginResp
lw.setRevision(resp.Revision)
- f(resp)
+ lwRsp := lw.doParsePluginRspToLwRsp(resp)
+ switch resp.Action {
+ case client.ActionPut:
+ lwRsp.Action = sdcommon.ActionPUT
+ case client.ActionDelete:
+ lwRsp.Action = sdcommon.ActionDelete
+ default:
+ log.Warn(fmt.Sprintf("unrecognized
action::%s", lwRsp.Action))
+ }
+
+ f(lwRsp)
return nil
}))
@@ -82,3 +97,21 @@ func (lw *innerListWatch) DoWatch(ctx context.Context, f
func(*client.PluginResp
}
return err
}
+
+func (lw *innerListWatch) doParsePluginRspToLwRsp(pluginRsp
*client.PluginResponse) *sdcommon.ListWatchResp {
+ lwRsp := &sdcommon.ListWatchResp{}
+
+ lwRsp.Revision = pluginRsp.Revision
+
+ for _, kv := range pluginRsp.Kvs {
+ resource := sdcommon.Resource{}
+ resource.Key = util.BytesToStringWithNoCopy(kv.Key)
+ resource.ModRevision = kv.ModRevision
+ resource.CreateRevision = kv.CreateRevision
+ resource.Version = kv.Version
+ resource.Value = kv.Value
+
+ lwRsp.Resources = append(lwRsp.Resources, &resource)
+ }
+ return lwRsp
+}
diff --git a/datasource/etcd/sd/etcd/listwatch_test.go
b/datasource/etcd/sd/etcd/listwatch_test.go
index 4bf3b5d..964cb84 100644
--- a/datasource/etcd/sd/etcd/listwatch_test.go
+++ b/datasource/etcd/sd/etcd/listwatch_test.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/servicecomb-service-center/datasource/etcd/client"
"github.com/apache/servicecomb-service-center/datasource/etcd/client/buildin"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
"github.com/coreos/etcd/mvcc/mvccpb"
)
@@ -63,12 +64,12 @@ func TestPrefixListWatch(t *testing.T) {
Prefix: "a",
rev: 1,
}
- resp, err := lw.List(ListWatchConfig{Timeout: time.Second, Context:
context.Background()})
+ resp, err := lw.List(sdcommon.ListWatchConfig{Timeout: time.Second,
Context: context.Background()})
if resp != nil || err == nil || lw.Revision() != 1 {
t.Fatalf("TestPrefixListWatch failed")
}
- w := lw.Watch(ListWatchConfig{Timeout: time.Second, Context:
context.Background()})
- resp = <-w.EventBus()
+ w := lw.EventBus(sdcommon.ListWatchConfig{Timeout: time.Second,
Context: context.Background()})
+ resp = <-w.ResourceEventBus()
if resp != nil || lw.Revision() != 0 {
t.Fatalf("TestPrefixListWatch failed")
}
@@ -82,12 +83,12 @@ func TestPrefixListWatch(t *testing.T) {
Prefix: "a",
rev: 1,
}
- resp, err = lw.List(ListWatchConfig{Timeout: time.Second, Context:
context.Background()})
+ resp, err = lw.List(sdcommon.ListWatchConfig{Timeout: time.Second,
Context: context.Background()})
if resp == nil || err != nil || lw.Revision() != 2 {
t.Fatalf("TestPrefixListWatch failed")
}
- w = lw.Watch(ListWatchConfig{Timeout: time.Second, Context:
context.Background()})
- resp = <-w.EventBus()
+ w = lw.EventBus(sdcommon.ListWatchConfig{Timeout: time.Second, Context:
context.Background()})
+ resp = <-w.ResourceEventBus()
if resp != nil || lw.Revision() != 0 {
t.Fatalf("TestPrefixListWatch failed")
}
@@ -102,12 +103,12 @@ func TestPrefixListWatch(t *testing.T) {
Prefix: "a",
rev: 1,
}
- resp, err = lw.List(ListWatchConfig{Timeout: time.Second, Context:
context.Background()})
+ resp, err = lw.List(sdcommon.ListWatchConfig{Timeout: time.Second,
Context: context.Background()})
if resp == nil || err != nil || lw.Revision() != 4 {
t.Fatalf("TestPrefixListWatch failed")
}
- w = lw.Watch(ListWatchConfig{Timeout: time.Second, Context:
context.Background()})
- resp = <-w.EventBus()
+ w = lw.EventBus(sdcommon.ListWatchConfig{Timeout: time.Second, Context:
context.Background()})
+ resp = <-w.ResourceEventBus()
if resp == nil || lw.Revision() != 3 {
t.Fatalf("TestPrefixListWatch failed")
}
@@ -115,7 +116,7 @@ func TestPrefixListWatch(t *testing.T) {
}
func TestListWatchConfig_String(t *testing.T) {
- lw := ListWatchConfig{Timeout: time.Second, Context:
context.Background()}
+ lw := sdcommon.ListWatchConfig{Timeout: time.Second, Context:
context.Background()}
if lw.String() != "{timeout: 1s}" {
t.Fatalf("TestListWatchConfig_String failed")
}
diff --git a/datasource/etcd/sd/etcd/watcher_test.go
b/datasource/etcd/sd/etcd/watcher_test.go
deleted file mode 100644
index 534b5f1..0000000
--- a/datasource/etcd/sd/etcd/watcher_test.go
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package etcd
-
-import (
- "context"
- "fmt"
- "testing"
- "time"
-
- "github.com/apache/servicecomb-service-center/datasource/etcd/client"
-)
-
-type mockWatcher struct {
- lw *mockListWatch
-}
-
-func (w *mockWatcher) EventBus() <-chan *client.PluginResponse {
- return w.lw.Bus
-}
-func (w *mockWatcher) Stop() {
-}
-
-type mockListWatch struct {
- ListResponse *client.PluginResponse
- Bus chan *client.PluginResponse
- Watcher Watcher
- Rev int64
-}
-
-func (lw *mockListWatch) List(op ListWatchConfig) (*client.PluginResponse,
error) {
- if lw.ListResponse == nil {
- return nil, fmt.Errorf("error")
- }
- lw.Rev = lw.ListResponse.Revision
- return lw.ListResponse, nil
-}
-func (lw *mockListWatch) DoWatch(ctx context.Context, f
func(*client.PluginResponse)) error {
- if lw.ListResponse == nil {
- return fmt.Errorf("error")
- }
- if len(lw.ListResponse.Kvs) > 0 {
- lw.Rev = lw.ListResponse.Kvs[0].ModRevision
- }
- f(lw.ListResponse)
- <-ctx.Done()
- return nil
-}
-func (lw *mockListWatch) Watch(op ListWatchConfig) Watcher {
- return lw.Watcher
-}
-func (lw *mockListWatch) Revision() int64 {
- return lw.Rev
-}
-
-func TestInnerWatcher_EventBus(t *testing.T) {
- w := newInnerWatcher(&mockListWatch{}, ListWatchConfig{Timeout:
time.Second, Context: context.Background()})
- resp := <-w.EventBus()
- if resp != nil {
- t.Fatalf("TestInnerWatcher_EventBus failed")
- }
- w.Stop()
-
- test := &client.PluginResponse{
- Action: client.ActionPut,
- }
- w = newInnerWatcher(&mockListWatch{ListResponse: test},
ListWatchConfig{Timeout: time.Second, Context: context.Background()})
- resp = <-w.EventBus()
- if resp == nil || resp.Action != client.ActionPut {
- t.Fatalf("TestInnerWatcher_EventBus failed")
- }
- w.Stop()
-}
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index fb607cc..1e84d84 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/mongo/client"
"github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/config"
"github.com/go-chassis/go-chassis/v2/storage"
@@ -72,6 +73,8 @@ func (ds *DataSource) initialize() error {
if err != nil {
return err
}
+ // init mongo cache
+ ds.initStore()
return nil
}
@@ -118,3 +121,12 @@ func (ds *DataSource) createIndexes() (err error) {
}
return
}
+
+func (ds *DataSource) initStore() {
+ if !config.GetRegistry().EnableCache {
+ log.Debug("cache is disabled")
+ return
+ }
+ sd.Store().Run()
+ <-sd.Store().Ready()
+}
diff --git a/datasource/mongo/sd/cache.go b/datasource/mongo/sd/cache.go
new file mode 100644
index 0000000..9819041
--- /dev/null
+++ b/datasource/mongo/sd/cache.go
@@ -0,0 +1,46 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+// Cache stores db data.
+type Cache interface {
+ CacheReader
+
+ Put(id string, v interface{})
+
+ Remove(id string)
+
+ MarkDirty()
+
+ Dirty() bool
+
+ Clear()
+}
+
+// CacheReader reads k-v data.
+type CacheReader interface {
+ Name() string // The name of implementation
+
+ Size() int // the bytes size of the cache
+
+ // Get gets a value by id
+ Get(id string) interface{}
+
+ // ForEach executes the given function for each of the k-v
+ ForEach(iter func(k string, v interface{}) (next bool))
+}
diff --git a/datasource/mongo/sd/common.go b/datasource/mongo/sd/common.go
new file mode 100644
index 0000000..e2c5bf1
--- /dev/null
+++ b/datasource/mongo/sd/common.go
@@ -0,0 +1,25 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+const (
+ insertOp = "insert"
+ updateOp = "update"
+ replaceOp = "replace"
+ deleteOp = "delete"
+)
diff --git a/datasource/mongo/sd/event_proxy.go
b/datasource/mongo/sd/event_proxy.go
new file mode 100644
index 0000000..227d1bd
--- /dev/null
+++ b/datasource/mongo/sd/event_proxy.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+var (
+ eventProxies = &sync.Map{}
+)
+
+type MongoEventProxy struct {
+ evtHandleFuncs []MongoEventFunc
+ lock sync.RWMutex
+}
+
+func (h *MongoEventProxy) AddHandleFunc(f MongoEventFunc) {
+ h.lock.Lock()
+ h.evtHandleFuncs = append(h.evtHandleFuncs, f)
+ h.lock.Unlock()
+}
+
+func (h *MongoEventProxy) OnEvent(evt MongoEvent) {
+ h.lock.RLock()
+ for _, f := range h.evtHandleFuncs {
+ f(evt)
+ }
+ h.lock.RUnlock()
+}
+
+func EventProxy(t string) *MongoEventProxy {
+ proxy, ok := eventProxies.Load(t)
+ if !ok {
+ proxy = &MongoEventProxy{}
+ eventProxies.Store(t, proxy)
+ }
+ return proxy.(*MongoEventProxy)
+}
+
+func AddEventHandler(h MongoEventHandler) {
+ EventProxy(h.Type()).AddHandleFunc(h.OnEvent)
+ log.Info(fmt.Sprintf("register event handler[%s] %s", h.Type(),
util.Reflect(h).Name()))
+}
diff --git a/datasource/mongo/sd/event_proxy_test.go
b/datasource/mongo/sd/event_proxy_test.go
new file mode 100644
index 0000000..3119cc9
--- /dev/null
+++ b/datasource/mongo/sd/event_proxy_test.go
@@ -0,0 +1,81 @@
+package sd
+
+import (
+ "sync"
+ "testing"
+
+ "github.com/go-chassis/cari/discovery"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestAddHandleFuncAndOnEvent(t *testing.T) {
+ var funcs []MongoEventFunc
+ mongoEventProxy := MongoEventProxy{
+ evtHandleFuncs: funcs,
+ }
+ mongoEvent := MongoEvent{
+ DocumentID: "",
+ ResourceID: "",
+ Type: discovery.EVT_CREATE,
+ Value: 1,
+ }
+ mongoEventProxy.evtHandleFuncs = funcs
+ assert.Equal(t, 0, len(mongoEventProxy.evtHandleFuncs),
+ "size of evtHandleFuncs is zero")
+ t.Run("AddHandleFunc one time", func(t *testing.T) {
+ mongoEventProxy.AddHandleFunc(mongoEventFuncGet())
+ mongoEventProxy.OnEvent(mongoEvent)
+ assert.Equal(t, 1, len(mongoEventProxy.evtHandleFuncs))
+ })
+ t.Run("AddHandleFunc three times", func(t *testing.T) {
+ for i := 0; i < 5; i++ {
+ mongoEventProxy.AddHandleFunc(mongoEventFuncGet())
+ mongoEventProxy.OnEvent(mongoEvent)
+ }
+ assert.Equal(t, 6, len(mongoEventProxy.evtHandleFuncs))
+ })
+}
+
+type mockInstanceEventHandler struct {
+}
+
+func (h *mockInstanceEventHandler) Type() string {
+ return instance
+}
+
+func (h *mockInstanceEventHandler) OnEvent(MongoEvent) {
+
+}
+
+func TestAddEventHandler(t *testing.T) {
+ AddEventHandler(&mockInstanceEventHandler{})
+
+}
+
+func TestEventProxy(t *testing.T) {
+ t.Run("when there is no such a proxy in eventProxies", func(t
*testing.T) {
+ eventProxies = &sync.Map{}
+ proxy := EventProxy("new")
+ p, ok := eventProxies.Load("new")
+
+ assert.Equal(t, true, ok)
+ assert.NotNil(t, p, "proxy is not nil")
+ assert.Nil(t, proxy.evtHandleFuncs)
+ })
+
+ t.Run("when there is no such a proxy in eventProxies", func(t
*testing.T) {
+ eventProxies = &sync.Map{}
+ mongoEventFunc := []MongoEventFunc{mongoEventFuncGet()}
+ mongoEventProxy := MongoEventProxy{
+ evtHandleFuncs: mongoEventFunc,
+ }
+ eventProxies.Store("a", &mongoEventProxy)
+ proxy := EventProxy("a")
+
+ p, ok := eventProxies.Load("a")
+ assert.Equal(t, true, ok)
+ assert.Equal(t, &mongoEventProxy, p)
+ assert.NotNil(t, p, "proxy is not nil")
+ assert.NotNil(t, proxy.evtHandleFuncs)
+ })
+}
diff --git a/datasource/mongo/sd/listwatch_inner.go
b/datasource/mongo/sd/listwatch_inner.go
new file mode 100644
index 0000000..2bee2a0
--- /dev/null
+++ b/datasource/mongo/sd/listwatch_inner.go
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "go.mongodb.org/mongo-driver/bson"
+ md "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type mongoListWatch struct {
+ Key string
+ resumeToken bson.Raw
+}
+
+func (lw *mongoListWatch) List(op sdcommon.ListWatchConfig)
(*sdcommon.ListWatchResp, error) {
+ otCtx, cancel := context.WithTimeout(op.Context, op.Timeout)
+ defer cancel()
+
+ resp, err := client.GetMongoClient().Find(otCtx, lw.Key, bson.M{})
+ if err != nil {
+ log.Error(fmt.Sprintf("list key %s failed", lw.Key), err)
+ return nil, err
+ }
+
+ // convert mongoListResponse to ListWatchResp
+ lwRsp := &sdcommon.ListWatchResp{}
+ lwRsp.Resources = make([]*sdcommon.Resource, 0)
+
+ for resp.Next(context.Background()) {
+ info := lw.doParseDocumentToResource(resp.Current)
+ lwRsp.Resources = append(lwRsp.Resources, &info)
+ }
+
+ return lwRsp, nil
+}
+
+func (lw *mongoListWatch) EventBus(op sdcommon.ListWatchConfig)
*sdcommon.EventBus {
+ return sdcommon.NewEventBus(lw, op)
+}
+
+func (lw *mongoListWatch) DoWatch(ctx context.Context, f
func(*sdcommon.ListWatchResp)) error {
+ csOptions := &options.ChangeStreamOptions{}
+ csOptions.SetFullDocument(options.UpdateLookup)
+
+ resumeToken := lw.resumeToken
+ if resumeToken != nil {
+ csOptions.SetResumeAfter(resumeToken)
+ }
+
+ resp, err := client.GetMongoClient().Watch(ctx, lw.Key, md.Pipeline{},
csOptions)
+
+ if err != nil {
+ log.Error(fmt.Sprintf("watch table %s failed", lw.Key), err)
+ f(nil)
+ return err
+ }
+
+ for resp.Next(ctx) {
+ lw.resumeToken = resp.ResumeToken()
+
+ wRsp := &MongoWatchResponse{}
+ err := bson.Unmarshal(resp.Current, &wRsp)
+
+ if err != nil {
+ log.Error("error to parse bson raw to mongo watch
response", err)
+ return err
+ }
+
+ // convert mongoWatchResponse to ListWatchResp
+ resource := lw.doParseWatchRspToResource(wRsp)
+
+ lwRsp := &sdcommon.ListWatchResp{}
+ lwRsp.Resources = append(lwRsp.Resources, &resource)
+ switch wRsp.OperationType {
+ case insertOp:
+ lwRsp.Action = sdcommon.ActionCreate
+ case updateOp:
+ lwRsp.Action = sdcommon.ActionUpdate
+ case deleteOp:
+ lwRsp.Action = sdcommon.ActionDelete
+ default:
+ log.Warn(fmt.Sprintf("unrecognized action:%s",
lwRsp.Action))
+ }
+
+ f(lwRsp)
+ }
+
+ return err
+}
+
+func (lw *mongoListWatch) doParseDocumentToResource(fullDocument bson.Raw)
(resource sdcommon.Resource) {
+ var err error
+
+ documentID := MongoDocument{}
+ err = bson.Unmarshal(fullDocument, &documentID)
+ if err != nil {
+ return
+ }
+
+ resource.DocumentID = documentID.ID.Hex()
+
+ switch lw.Key {
+ case instance:
+ instance := Instance{}
+ err = bson.Unmarshal(fullDocument, &instance)
+ if err != nil {
+ log.Error("error to parse bson raw to documentInfo",
err)
+ return
+ }
+ resource.Key = instance.InstanceInfo.InstanceId
+ resource.Value = instance
+ case service:
+ service := Service{}
+ err := bson.Unmarshal(fullDocument, &service)
+ if err != nil {
+ log.Error("error to parse bson raw to documentInfo",
err)
+ return
+ }
+ resource.Key = service.ServiceInfo.ServiceId
+ resource.Value = service
+ default:
+ return
+ }
+
+ return
+}
+
+func (lw *mongoListWatch) ResumeToken() bson.Raw {
+ return lw.resumeToken
+}
+
+func (lw *mongoListWatch) setResumeToken(resumeToken bson.Raw) {
+ lw.resumeToken = resumeToken
+}
+
+func (lw *mongoListWatch) doParseWatchRspToResource(wRsp *MongoWatchResponse)
(resource sdcommon.Resource) {
+ switch wRsp.OperationType {
+ case deleteOp:
+ //delete operation has no fullDocumentValue
+ resource.DocumentID = wRsp.DocumentKey.ID.Hex()
+ return
+ case insertOp, updateOp, replaceOp:
+ return lw.doParseDocumentToResource(wRsp.FullDocument)
+ default:
+ log.Warn(fmt.Sprintf("unrecognized operation:%s",
wRsp.OperationType))
+ }
+ return
+}
diff --git a/datasource/mongo/sd/listwatch_test.go
b/datasource/mongo/sd/listwatch_test.go
new file mode 100644
index 0000000..ad0a03a
--- /dev/null
+++ b/datasource/mongo/sd/listwatch_test.go
@@ -0,0 +1,87 @@
+package sd
+
+import (
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func TestListWatchConfig_String(t *testing.T) {
+ t.Run("TestListWatchConfig_String", func(t *testing.T) {
+ config := sdcommon.ListWatchConfig{
+ Timeout: 666,
+ }
+ ret := config.String()
+ assert.Equal(t, "{timeout: 666ns}", ret)
+ })
+ t.Run("when time is nil", func(t *testing.T) {
+ config := sdcommon.ListWatchConfig{}
+ ret := config.String()
+ assert.Equal(t, "{timeout: 0s}", ret)
+ })
+}
+
+func TestDoParseWatchRspToMongoInfo(t *testing.T) {
+ documentID := primitive.NewObjectID()
+
+ mockDocument, _ := bson.Marshal(bson.M{"_id": documentID, "domain":
"default", "project": "default", "instanceinfo": bson.M{"instanceid":
"8064a600438511eb8584fa163e8a81c9", "serviceid":
"91afbe0faa9dc1594689139f099eb293b0cd048d",
+ "hostname": "ecs-hcsadlab-dev-0002", "status": "UP",
"timestamp": "1608552622", "modtimestamp": "1608552622", "version": "0.0.1"}})
+
+ mockServiceDocument, _ := bson.Marshal(bson.M{"_id": documentID,
"domain": "default", "project": "default", "serviceinfo": bson.M{"serviceid":
"91afbe0faa9dc1594689139f099eb293b0cd048d", "timestamp": "1608552622",
"modtimestamp": "1608552622", "version": "0.0.1"}})
+
+ // case instance insertOp
+
+ mockWatchRsp := &MongoWatchResponse{OperationType: insertOp,
+ FullDocument: mockDocument,
+ DocumentKey: MongoDocument{ID: documentID},
+ }
+ ilw := mongoListWatch{
+ Key: instance,
+ }
+ info := ilw.doParseWatchRspToResource(mockWatchRsp)
+ assert.Equal(t, documentID.Hex(), info.DocumentID)
+ assert.Equal(t, "8064a600438511eb8584fa163e8a81c9", info.Key)
+
+ // case updateOp
+ mockWatchRsp.OperationType = updateOp
+ info = ilw.doParseWatchRspToResource(mockWatchRsp)
+ assert.Equal(t, documentID.Hex(), info.DocumentID)
+ assert.Equal(t, "8064a600438511eb8584fa163e8a81c9", info.Key)
+ assert.Equal(t, "1608552622",
info.Value.(Instance).InstanceInfo.ModTimestamp)
+
+ // case delete
+ mockWatchRsp.OperationType = deleteOp
+ info = ilw.doParseWatchRspToResource(mockWatchRsp)
+ assert.Equal(t, documentID.Hex(), info.DocumentID)
+ assert.Equal(t, "", info.Key)
+
+ // case service insertOp
+ mockWatchRsp = &MongoWatchResponse{OperationType: insertOp,
+ FullDocument: mockServiceDocument,
+ DocumentKey: MongoDocument{ID: primitive.NewObjectID()},
+ }
+ ilw.Key = service
+ info = ilw.doParseWatchRspToResource(mockWatchRsp)
+ assert.Equal(t, documentID.Hex(), info.DocumentID)
+ assert.Equal(t, "91afbe0faa9dc1594689139f099eb293b0cd048d", info.Key)
+}
+
+func TestInnerListWatch_ResumeToken(t *testing.T) {
+ ilw := mongoListWatch{
+ Key: instance,
+ resumeToken: bson.Raw("resumToken"),
+ }
+ t.Run("get resume token test", func(t *testing.T) {
+ res := ilw.ResumeToken()
+ assert.NotNil(t, res)
+ assert.Equal(t, bson.Raw("resumToken"), res)
+ })
+
+ t.Run("set resume token test", func(t *testing.T) {
+ ilw.setResumeToken(bson.Raw("resumToken2"))
+ assert.Equal(t, ilw.resumeToken, bson.Raw("resumToken2"))
+ })
+}
diff --git a/datasource/mongo/sd/mongo_cache.go
b/datasource/mongo/sd/mongo_cache.go
new file mode 100644
index 0000000..942a3b8
--- /dev/null
+++ b/datasource/mongo/sd/mongo_cache.go
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+ "sync"
+
+ "github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+// MongoCache implements Cache.
+// MongoCache is dedicated to stores service discovery data,
+// e.g. service, instance, lease.
+type MongoCache struct {
+ Options *Options
+ name string
+ store map[string]interface{}
+ documentStore map[string]string
+ rwMux sync.RWMutex
+ dirty bool
+}
+
+func (c *MongoCache) Name() string {
+ return c.name
+}
+
+func (c *MongoCache) Size() (l int) {
+ c.rwMux.RLock()
+ l = int(util.Sizeof(c.store))
+ c.rwMux.RUnlock()
+ return
+}
+
+func (c *MongoCache) Get(id string) (v interface{}) {
+ c.rwMux.RLock()
+ if p, ok := c.store[id]; ok {
+ v = p
+ }
+ c.rwMux.RUnlock()
+ return
+}
+
+func (c *MongoCache) GetKeyByDocumentID(documentKey string) (id string) {
+ c.rwMux.RLock()
+ id = c.documentStore[documentKey]
+ c.rwMux.RUnlock()
+ return
+}
+
+func (c *MongoCache) GetDocumentIDByID(id string) (documentID string) {
+ c.rwMux.RLock()
+ for k, v := range c.documentStore {
+ if v == id {
+ documentID = k
+ break
+ }
+ }
+ c.rwMux.RUnlock()
+ return
+}
+
+func (c *MongoCache) Put(id string, v interface{}) {
+ c.rwMux.Lock()
+ c.store[id] = v
+ c.rwMux.Unlock()
+}
+
+func (c *MongoCache) PutDocumentID(id string, documentID string) {
+ c.rwMux.Lock()
+ c.documentStore[documentID] = id
+ c.rwMux.Unlock()
+}
+
+func (c *MongoCache) Remove(id string) {
+ c.rwMux.Lock()
+ delete(c.store, id)
+ c.rwMux.Unlock()
+}
+
+func (c *MongoCache) RemoveDocumentID(documentID string) {
+ c.rwMux.Lock()
+
+ delete(c.documentStore, documentID)
+
+ c.rwMux.Unlock()
+}
+
+func (c *MongoCache) MarkDirty() {
+ c.dirty = true
+}
+
+func (c *MongoCache) Dirty() bool { return c.dirty }
+
+func (c *MongoCache) Clear() {
+ c.rwMux.Lock()
+ c.dirty = false
+ c.store = make(map[string]interface{})
+ c.rwMux.Unlock()
+}
+
+func (c *MongoCache) ForEach(iter func(k string, v interface{}) (next bool)) {
+ c.rwMux.RLock()
+loopParent:
+ for k, v := range c.store {
+ if v == nil {
+ continue loopParent
+ }
+ if !iter(k, v) {
+ break loopParent
+ }
+ }
+ c.rwMux.RUnlock()
+}
+
+func NewMongoCache(name string, options *Options) *MongoCache {
+ return &MongoCache{
+ Options: options,
+ name: name,
+ store: make(map[string]interface{}),
+ documentStore: make(map[string]string),
+ }
+}
diff --git a/datasource/mongo/sd/mongo_cacher.go
b/datasource/mongo/sd/mongo_cacher.go
new file mode 100644
index 0000000..57d28a6
--- /dev/null
+++ b/datasource/mongo/sd/mongo_cacher.go
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+ "sync"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/backoff"
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ rmodel "github.com/go-chassis/cari/discovery"
+)
+
+// MongoCacher manages mongo cache.
+// To updateOp cache, MongoCacher watch mongo event and pull data periodly
from mongo.
+// When the cache data changes, MongoCacher creates events and notifies it's
+// subscribers.
+// Use Options to set it's behaviors.
+type MongoCacher struct {
+ Options *Options
+ reListCount int
+ isFirstTime bool
+ cache *MongoCache
+ ready chan struct{}
+ lw sdcommon.ListWatch
+ mux sync.Mutex
+ once sync.Once
+ goroutine *gopool.Pool
+}
+
+func (c *MongoCacher) Cache() *MongoCache {
+ return c.cache
+}
+
+func (c *MongoCacher) Run() {
+ c.once.Do(func() {
+ c.goroutine.Do(c.refresh)
+ })
+}
+
+func (c *MongoCacher) Stop() {
+ c.goroutine.Close(true)
+
+ util.SafeCloseChan(c.ready)
+}
+
+func (c *MongoCacher) Ready() <-chan struct{} {
+ return c.ready
+}
+
+func (c *MongoCacher) IsReady() bool {
+ select {
+ case <-c.ready:
+ return true
+ default:
+ return false
+ }
+}
+
+func (c *MongoCacher) needList() bool {
+ if c.isFirstTime {
+ c.isFirstTime = false
+ return true
+ }
+ c.reListCount++
+ if c.reListCount < sdcommon.DefaultForceListInterval {
+ return false
+ }
+ c.reListCount = 0
+ return true
+}
+
+func (c *MongoCacher) doList(cfg sdcommon.ListWatchConfig) error {
+ resp, err := c.lw.List(cfg)
+ if err != nil {
+ return err
+ }
+
+ resources := resp.Resources
+
+ defer log.Debug(fmt.Sprintf("finish to cache key %s, %d items",
+ c.Options.Key, len(resources)))
+
+ //just reset the cacher if cache marked dirty
+ if c.cache.Dirty() {
+ c.reset(resources)
+ log.Warn(fmt.Sprintf("Cache[%s] is reset!", c.cache.Name()))
+ return nil
+ }
+
+ // calc and return the diff between cache and mongodb
+ events := c.filter(resources)
+
+ //notify the subscribers
+ c.sync(events)
+ return nil
+}
+
+func (c *MongoCacher) reset(infos []*sdcommon.Resource) {
+ // clear cache before Set is safe, because the watch operation is stop,
+ // but here will make all API requests go to MONGO directly.
+ c.cache.Clear()
+ // do not notify when cacher is dirty status,
+ // otherwise, too many events will notify to downstream.
+ c.buildCache(c.filter(infos))
+}
+
+func (c *MongoCacher) doWatch(cfg sdcommon.ListWatchConfig) error {
+ if eventbus := c.lw.EventBus(cfg); eventbus != nil {
+ return c.handleEventBus(eventbus)
+ }
+ return fmt.Errorf("handle a nil watcher")
+}
+
+func (c *MongoCacher) ListAndWatch(ctx context.Context) error {
+ c.mux.Lock()
+ defer c.mux.Unlock()
+ defer log.Recover() // ensure ListAndWatch never raise panic
+
+ cfg := sdcommon.ListWatchConfig{
+ Timeout: c.Options.Timeout,
+ Context: ctx,
+ }
+
+ // first time should initial cache, set watch timeout less
+ if c.isFirstTime {
+ cfg.Timeout = FirstTimeout
+ }
+
+ err := c.doWatch(cfg)
+ if err != nil {
+ log.Error("doWatch err", err)
+ }
+
+ // the scenario need to list mongo:
+ // 1. Initial: cache is building, the lister's is first time to run.
+ // 2. Runtime: error occurs in previous watch operation, the lister's
status is set to error.
+ // 3. Runtime: watch operation timed out over
DEFAULT_FORCE_LIST_INTERVAL times.
+ if c.needList() {
+ // recover timeout for list
+ if c.isFirstTime {
+ cfg.Timeout = c.Options.Timeout
+ }
+
+ if err := c.doList(cfg); err != nil && (!c.IsReady()) {
+ log.Error("doList error", err)
+ return err // do retry to list mongo
+ }
+ // keep going to next step:
+ // 1. doList return OK.
+ // 2. some traps in mongo client
+ }
+
+ util.SafeCloseChan(c.ready)
+
+ return nil
+}
+
+func (c *MongoCacher) handleEventBus(eventbus *sdcommon.EventBus) error {
+ defer eventbus.Stop()
+
+ if eventbus.Bus == nil {
+ return nil
+ }
+
+ for resp := range eventbus.ResourceEventBus() {
+ events := make([]MongoEvent, 0)
+
+ if resp == nil {
+ return errors.New("handle watcher error")
+ }
+
+ for _, resource := range resp.Resources {
+ action := resp.Action
+ var event MongoEvent
+ switch action {
+ case sdcommon.ActionCreate:
+ event = NewMongoEventByResource(resource,
rmodel.EVT_CREATE)
+ case sdcommon.ActionUpdate:
+ event = NewMongoEventByResource(resource,
rmodel.EVT_UPDATE)
+ case sdcommon.ActionDelete:
+ resource.Key =
c.cache.GetKeyByDocumentID(resource.DocumentID)
+ resource.Value = c.cache.Get(resource.Key)
+ event = NewMongoEventByResource(resource,
rmodel.EVT_DELETE)
+ }
+ events = append(events, event)
+ }
+
+ c.sync(events)
+ log.Debug(fmt.Sprintf("finish to handle %d events, table: %s",
len(events), c.Options.Key))
+ }
+
+ return nil
+}
+
+func (c *MongoCacher) refresh(ctx context.Context) {
+ log.Debug(fmt.Sprintf("start to list and watch %s", c.Options))
+ retries := 0
+
+ timer := time.NewTimer(sdcommon.MinWaitInterval)
+ defer timer.Stop()
+ for {
+ nextPeriod := sdcommon.MinWaitInterval
+ if err := c.ListAndWatch(ctx); err != nil {
+ retries++
+ nextPeriod = backoff.GetBackoff().Delay(retries)
+ } else {
+ retries = 0
+ }
+
+ select {
+ case <-ctx.Done():
+ log.Debug(fmt.Sprintf("stop to list and watch %s",
c.Options))
+ return
+ case <-timer.C:
+ timer.Reset(nextPeriod)
+ }
+ }
+}
+
+// keep the evts valID when call sync
+func (c *MongoCacher) sync(evts []MongoEvent) {
+ if len(evts) == 0 {
+ return
+ }
+
+ c.onEvents(evts)
+}
+
+func (c *MongoCacher) filter(infos []*sdcommon.Resource) []MongoEvent {
+ nc := len(infos)
+ newStore := make(map[string]interface{}, nc)
+ documentIDRecord := make(map[string]string, nc)
+
+ for _, info := range infos {
+ event := NewMongoEventByResource(info, rmodel.EVT_CREATE)
+ newStore[event.ResourceID] = info.Value
+ documentIDRecord[event.ResourceID] = info.DocumentID
+ }
+
+ filterStopCh := make(chan struct{})
+ eventsCh := make(chan [sdcommon.EventBlockSize]MongoEvent, 2)
+
+ go c.filterDelete(newStore, eventsCh, filterStopCh)
+
+ go c.filterCreateOrUpdate(newStore, documentIDRecord, eventsCh,
filterStopCh)
+
+ events := make([]MongoEvent, 0, nc)
+ for block := range eventsCh {
+ for _, e := range block {
+ if e.Value == nil {
+ break
+ }
+ events = append(events, e)
+ }
+ }
+ return events
+}
+
+func (c *MongoCacher) filterDelete(newStore map[string]interface{},
+ eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan
struct{}) {
+ var block [sdcommon.EventBlockSize]MongoEvent
+ i := 0
+
+ c.cache.ForEach(func(k string, v interface{}) (next bool) {
+ next = true
+
+ _, ok := newStore[k]
+ if ok {
+ // k in store, also in new store, is not deleted, return
+ return
+ }
+
+ // k in store but not in new store, it means k is deleted
+ if i >= sdcommon.EventBlockSize {
+ eventsCh <- block
+ block = [sdcommon.EventBlockSize]MongoEvent{}
+ i = 0
+ }
+
+ documentID := c.cache.GetDocumentIDByID(k)
+ block[i] = NewMongoEvent(k, documentID, rmodel.EVT_DELETE, v)
+ i++
+ return
+ })
+
+ if i > 0 {
+ eventsCh <- block
+ }
+
+ close(filterStopCh)
+}
+
+func (c *MongoCacher) filterCreateOrUpdate(newStore map[string]interface{},
newDocumentStore map[string]string,
+ eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan
struct{}) {
+ var block [sdcommon.EventBlockSize]MongoEvent
+ i := 0
+
+ for k, v := range newStore {
+ ov := c.cache.Get(k)
+ if ov == nil {
+ if i >= sdcommon.EventBlockSize {
+ eventsCh <- block
+ block = [sdcommon.EventBlockSize]MongoEvent{}
+ i = 0
+ }
+
+ block[i] = NewMongoEvent(k, newDocumentStore[k],
rmodel.EVT_CREATE, v)
+ i++
+
+ continue
+ }
+
+ if c.isValueNotUpdated(v, ov) {
+ continue
+ }
+
+ log.Debug(fmt.Sprintf("value is updateOp of key:%s, old value
is:%s, new value is:%s", k, ov, v))
+
+ if i >= sdcommon.EventBlockSize {
+ eventsCh <- block
+ block = [sdcommon.EventBlockSize]MongoEvent{}
+ i = 0
+ }
+
+ block[i] = NewMongoEvent(k, newDocumentStore[k],
rmodel.EVT_UPDATE, v)
+ i++
+ }
+
+ if i > 0 {
+ eventsCh <- block
+ }
+
+ <-filterStopCh
+
+ close(eventsCh)
+}
+
+func (c *MongoCacher) isValueNotUpdated(value interface{}, newValue
interface{}) bool {
+ var modTime string
+ var newModTime string
+
+ switch c.Options.Key {
+ case instance:
+ instance := value.(Instance)
+ newInstance := newValue.(Instance)
+ if instance.InstanceInfo == nil || newInstance.InstanceInfo ==
nil {
+ return true
+ }
+ modTime = instance.InstanceInfo.ModTimestamp
+ newModTime = newInstance.InstanceInfo.ModTimestamp
+ case service:
+ service := value.(Service)
+ newService := newValue.(Service)
+ if service.ServiceInfo == nil || newService.ServiceInfo == nil {
+ return true
+ }
+ modTime = service.ServiceInfo.ModTimestamp
+ newModTime = newService.ServiceInfo.ModTimestamp
+ }
+
+ if newModTime == "" || modTime == newModTime {
+ return true
+ }
+
+ return false
+}
+
+func (c *MongoCacher) onEvents(events []MongoEvent) {
+ c.buildCache(events)
+
+ c.notify(events)
+}
+
+func (c *MongoCacher) buildCache(events []MongoEvent) {
+ for i, evt := range events {
+ key := evt.ResourceID
+ value := c.cache.Get(key)
+ ok := value != nil
+
+ switch evt.Type {
+ case rmodel.EVT_CREATE, rmodel.EVT_UPDATE:
+ switch {
+ case !c.IsReady():
+ evt.Type = rmodel.EVT_INIT
+ case !ok && evt.Type != rmodel.EVT_CREATE:
+ log.Warn(fmt.Sprintf("unexpected %s event! it
should be %s key %s",
+ evt.Type, rmodel.EVT_CREATE, key))
+ evt.Type = rmodel.EVT_CREATE
+ case ok && evt.Type != rmodel.EVT_UPDATE:
+ log.Warn(fmt.Sprintf("unexpected %s event! it
should be %s key %s",
+ evt.Type, rmodel.EVT_UPDATE, key))
+ evt.Type = rmodel.EVT_UPDATE
+ }
+
+ c.cache.Put(key, evt.Value)
+ c.cache.PutDocumentID(key, evt.DocumentID)
+
+ events[i] = evt
+ case rmodel.EVT_DELETE:
+ if !ok {
+ log.Warn(fmt.Sprintf("unexpected %s event! key
%s does not cache",
+ evt.Type, key))
+ } else {
+ evt.Value = value
+
+ c.cache.Remove(key)
+ c.cache.RemoveDocumentID(evt.DocumentID)
+ }
+ events[i] = evt
+ }
+ }
+}
+
+func (c *MongoCacher) notify(evts []MongoEvent) {
+ eventProxy := EventProxy(c.Options.Key)
+
+ if eventProxy == nil {
+ return
+ }
+
+ defer log.Recover()
+
+ for _, evt := range evts {
+ eventProxy.OnEvent(evt)
+ }
+}
+
+func NewMongoCacher(options *Options, cache *MongoCache) *MongoCacher {
+ return &MongoCacher{
+ Options: options,
+ isFirstTime: true,
+ cache: cache,
+ ready: make(chan struct{}),
+ lw: &mongoListWatch{
+ Key: options.Key,
+ },
+ goroutine: gopool.New(context.Background()),
+ }
+}
diff --git a/datasource/mongo/sd/mongo_cacher_test.go
b/datasource/mongo/sd/mongo_cacher_test.go
new file mode 100644
index 0000000..86f571f
--- /dev/null
+++ b/datasource/mongo/sd/mongo_cacher_test.go
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+ "testing"
+
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/stretchr/testify/assert"
+ "go.mongodb.org/mongo-driver/bson"
+)
+
+type MockListWatch struct {
+ ListResponse *sdcommon.ListWatchResp
+ WatchResponse *sdcommon.ListWatchResp
+ resumeToken bson.Raw
+}
+
+func (lw *MockListWatch) List(sdcommon.ListWatchConfig)
(*sdcommon.ListWatchResp, error) {
+ if lw.ListResponse == nil {
+ return nil, fmt.Errorf("list error")
+ }
+ return lw.ListResponse, nil
+}
+
+func (lw *MockListWatch) DoWatch(ctx context.Context, f
func(*sdcommon.ListWatchResp)) error {
+ if lw.WatchResponse == nil {
+ return fmt.Errorf("error")
+ }
+
+ f(lw.WatchResponse)
+ <-ctx.Done()
+ return nil
+}
+
+func (lw *MockListWatch) EventBus(op sdcommon.ListWatchConfig)
*sdcommon.EventBus {
+ return sdcommon.NewEventBus(lw, op)
+}
+
+func TestNewMongoCacher(t *testing.T) {
+ mockMongoCache := NewMongoCache("test", DefaultOptions())
+ lw := &MockListWatch{}
+
+ cr := &MongoCacher{
+ Options: DefaultOptions(),
+ ready: make(chan struct{}),
+ isFirstTime: true,
+ lw: lw,
+ goroutine: gopool.New(context.Background()),
+ cache: mockMongoCache,
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ // case: cause list internal error before initialized
+ t.Run("case list: internal error before initialized", func(t
*testing.T) {
+ cr.refresh(ctx)
+ if cr.IsReady() {
+ t.Fatalf("TestNewKvCacher failed")
+ }
+
+ })
+
+ // prepare mock data
+ var evt MongoEvent
+ cr = &MongoCacher{
+ Options: DefaultOptions().SetTable(instance),
+ ready: make(chan struct{}),
+ lw: lw,
+ goroutine: gopool.New(context.Background()),
+ cache: mockMongoCache,
+ }
+
+ EventProxy(instance).AddHandleFunc(func(e MongoEvent) {
+ evt = e
+ })
+
+ mockDocumentID := "5fcf2f1a4ea1e6d2f4c61d47"
+ mockResourceID := "95cd8dbf3e8411eb92d8fa163e8a81c9"
+ mockResumeToken, _ :=
+ bson.Marshal(bson.M{"_data":
"825FDB4272000000012B022C0100296E5A10043E2D15AC82D9484C8090E68AF36FED2A46645F696400645FD76265066A6D2DF2AAC8D80004"})
+
+ var resources []*sdcommon.Resource
+ resource := &sdcommon.Resource{Key: mockResourceID, DocumentID:
mockDocumentID, Value: Instance{Domain: "default", Project: "default",
+ InstanceInfo: &pb.MicroServiceInstance{InstanceId:
mockResourceID, ModTimestamp: "100000"}}}
+ resources = append(resources, resource)
+ test := &sdcommon.ListWatchResp{
+ Action: sdcommon.ActionCreate,
+ Resources: resources,
+ }
+
+ evtExpect := MongoEvent{
+ DocumentID: mockDocumentID,
+ ResourceID: mockResourceID,
+ Value: resource.Value,
+ Type: pb.EVT_INIT,
+ }
+
+ // case: list 1 resp and watch 0 event
+ t.Run("case list: first time list init cache", func(t *testing.T) {
+ // case: resume token is nil, first time list event is init
+ cr.isFirstTime = true
+ cr.cache.Remove(mockResourceID)
+ cr.cache.RemoveDocumentID(mockDocumentID)
+
+ lw.ListResponse = test
+ lw.resumeToken = nil
+ lw.WatchResponse = nil
+
+ cr.refresh(ctx)
+
+ // check ready
+ assert.Equal(t, true, cr.IsReady())
+
+ //check config
+ assert.Equal(t, instance, cr.Options.Key)
+
+ // check event
+ assert.Equal(t, evtExpect, evt)
+
+ // check cache
+ cache := cr.cache.Get(mockResourceID)
+ assert.Equal(t, resource.Value, cache)
+ })
+
+ t.Run("case list: re-list and updateOp cache", func(t *testing.T) {
+ // case: re-list and should be no event
+ lw.WatchResponse = nil
+ evt.Value = nil
+ cr.refresh(ctx)
+
+ //check events
+ assert.Nil(t, evt.Value)
+
+ // check cache
+ cache := cr.cache.Get(mockResourceID)
+ assert.Equal(t, resource.Value, cache)
+
+ // prepare updateOp data
+ dataUpdate := &sdcommon.Resource{Key: mockResourceID,
DocumentID: mockDocumentID,
+ Value: Instance{Domain: "default", Project: "default",
+ InstanceInfo:
&pb.MicroServiceInstance{InstanceId: mockResourceID, HostName: "test",
ModTimestamp: "100001"}}}
+
+ var mongoUpdateResources []*sdcommon.Resource
+ mongoUpdateResources = append(mongoUpdateResources, dataUpdate)
+ testUpdate := &sdcommon.ListWatchResp{
+ Action: sdcommon.ActionUpdate,
+ Resources: mongoUpdateResources,
+ }
+
+ lw.ListResponse = testUpdate
+ lw.resumeToken = mockResumeToken
+
+ // case: re-list and over no event times, and then event should
be updateOp
+ for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+ lw.WatchResponse = nil
+ }
+
+ evt.Value = nil
+ for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+ cr.refresh(ctx)
+ }
+ // check event
+ evtExpect.Type = pb.EVT_UPDATE
+ evtExpect.Value = dataUpdate.Value
+ assert.Equal(t, evtExpect, evt)
+
+ // check cache
+ cache = cr.cache.Get(mockResourceID)
+ assert.Equal(t, dataUpdate.Value, cache)
+ })
+
+ t.Run("case list: no infos list and delete cache", func(t *testing.T) {
+ // case: no infos list, event should be delete
+ lw.ListResponse = &sdcommon.ListWatchResp{}
+ for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+ lw.WatchResponse = nil
+ }
+ evt.Value = nil
+ for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+ cr.refresh(ctx)
+ }
+
+ // check event
+ evtExpect.Type = pb.EVT_DELETE
+ assert.Equal(t, evtExpect, evt)
+
+ // check cache
+ cache := cr.cache.Get(mockResourceID)
+ assert.Nil(t, nil, cache)
+ })
+
+ t.Run("case list: mark cache dirty and reset cache", func(t *testing.T)
{
+ lw.ListResponse = test
+ cr.isFirstTime = true
+ evt.Value = nil
+
+ cr.cache.MarkDirty()
+ cr.refresh(ctx)
+
+ // check event
+ if evt.Value != nil {
+ t.Fatalf("TestNewMongoCacher failed, %v", evt)
+ }
+
+ // check cache
+ cache := cr.cache.Get(mockResourceID)
+ assert.Equal(t, resource.Value, cache)
+ })
+
+ t.Run("case watch: caught create event", func(t *testing.T) {
+ cr.cache.Remove(mockResourceID)
+ cr.cache.RemoveDocumentID(mockDocumentID)
+ lw.WatchResponse = test
+ lw.ListResponse = nil
+ lw.resumeToken = mockResumeToken
+
+ cr.refresh(ctx)
+
+ // check event
+ evtExpect.Type = pb.EVT_CREATE
+ evtExpect.Value = resource.Value
+ assert.Equal(t, evtExpect, evt)
+
+ // check cache
+ cache := cr.cache.Get(mockResourceID)
+ assert.Equal(t, resource.Value, cache)
+
+ })
+
+ t.Run("case watch: caught updateOp event", func(t *testing.T) {
+ // prepare updateOp data
+ dataUpdate := &sdcommon.Resource{Key: mockResourceID,
DocumentID: mockDocumentID,
+ Value: Instance{Domain: "default", Project: "default",
+ InstanceInfo:
&pb.MicroServiceInstance{InstanceId: mockResourceID, HostName: "test",
ModTimestamp: "100001"}}}
+
+ var mongoUpdateResources []*sdcommon.Resource
+ mongoUpdateResources = append(mongoUpdateResources, dataUpdate)
+ testUpdate := &sdcommon.ListWatchResp{
+ Action: sdcommon.ActionUpdate,
+ Resources: mongoUpdateResources,
+ }
+ lw.WatchResponse = testUpdate
+ lw.ListResponse = nil
+
+ cr.refresh(ctx)
+
+ // check event
+ evtExpect.Type = pb.EVT_UPDATE
+ evtExpect.Value = dataUpdate.Value
+ assert.Equal(t, evtExpect, evt)
+
+ // check cache
+ cache := cr.cache.Get(mockResourceID)
+ assert.Equal(t, dataUpdate.Value, cache)
+ })
+
+ t.Run("case watch: caught delete event but value is nil", func(t
*testing.T) {
+ test.Action = sdcommon.ActionDelete
+ lw.WatchResponse = test
+ lw.ListResponse = nil
+
+ cr.refresh(ctx)
+
+ // check event
+ evtExpect.Type = pb.EVT_DELETE
+ assert.Equal(t, evtExpect, evt)
+
+ // check cache
+ cache := cr.cache.Get(mockResourceID)
+ assert.Nil(t, cache)
+
+ })
+}
+
+func TestMongoCacher_Run(t *testing.T) {
+ lw := &MockListWatch{}
+
+ mockMongoCache := NewMongoCache("test", DefaultOptions())
+ cr := &MongoCacher{
+ Options: DefaultOptions().SetTable(instance),
+ ready: make(chan struct{}),
+ lw: lw,
+ goroutine: gopool.New(context.Background()),
+ cache: mockMongoCache,
+ }
+
+ cr.Run()
+
+ // check cache
+ cache := cr.cache
+ assert.NotNil(t, cache)
+
+ cr.Stop()
+}
diff --git a/datasource/etcd/sd/etcd/watcher.go b/datasource/mongo/sd/options.go
similarity index 53%
rename from datasource/etcd/sd/etcd/watcher.go
rename to datasource/mongo/sd/options.go
index 37af706..f061003 100644
--- a/datasource/etcd/sd/etcd/watcher.go
+++ b/datasource/mongo/sd/options.go
@@ -15,13 +15,42 @@
* limitations under the License.
*/
-package etcd
+package sd
import (
- "github.com/apache/servicecomb-service-center/datasource/etcd/client"
+ "fmt"
+ "time"
)
-type Watcher interface {
- EventBus() <-chan *client.PluginResponse
- Stop()
+const (
+ FirstTimeout = 2 * time.Second
+ DefaultTimeout = 30 * time.Second
+ DefaultCacheInitSize = 100
+)
+
+type Options struct {
+ // Key is the table to unique specify resource type
+ Key string
+ InitSize int
+ Timeout time.Duration
+ Period time.Duration
+}
+
+func (options *Options) String() string {
+ return fmt.Sprintf("{key: %s, timeout: %s, period: %s}",
+ options.Key, options.Timeout, options.Period)
+}
+
+func (options *Options) SetTable(key string) *Options {
+ options.Key = key
+ return options
+}
+
+func DefaultOptions() *Options {
+ return &Options{
+ Key: "",
+ Timeout: DefaultTimeout,
+ Period: time.Second,
+ InitSize: DefaultCacheInitSize,
+ }
}
diff --git a/datasource/mongo/sd/options_test.go
b/datasource/mongo/sd/options_test.go
new file mode 100644
index 0000000..30c78c2
--- /dev/null
+++ b/datasource/mongo/sd/options_test.go
@@ -0,0 +1,42 @@
+package sd
+
+import (
+ "testing"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/go-chassis/cari/discovery"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestOptions(t *testing.T) {
+ options := Options{
+ Key: "",
+ }
+ assert.Empty(t, options, "config is empty")
+
+ options1 := options.SetTable("configKey")
+ assert.Equal(t, "configKey", options1.Key,
+ "contain key after method WithTable")
+
+ assert.Equal(t, 0, options1.InitSize,
+ "init size is zero")
+
+ mongoEventFunc = mongoEventFuncGet()
+
+ out := options1.String()
+ assert.NotNil(t, out,
+ "method String return not after methods")
+}
+
+var mongoEventFunc MongoEventFunc
+
+func mongoEventFuncGet() MongoEventFunc {
+ fun := func(evt MongoEvent) {
+ evt.DocumentID = "DocumentID has changed"
+ evt.ResourceID = "BusinessID has changed"
+ evt.Value = 2
+ evt.Type = discovery.EVT_UPDATE
+ log.Info("in event func")
+ }
+ return fun
+}
diff --git a/datasource/mongo/sd/types.go b/datasource/mongo/sd/types.go
new file mode 100644
index 0000000..151bf17
--- /dev/null
+++ b/datasource/mongo/sd/types.go
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+ "time"
+
+ "github.com/go-chassis/cari/discovery"
+ pb "github.com/go-chassis/cari/discovery"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+const (
+ service = "service"
+ instance = "instance"
+)
+
+var (
+ Types []string
+)
+
+func RegisterType(name string) {
+ Types = append(Types, name)
+}
+
+type MongoEvent struct {
+ DocumentID string
+ ResourceID string
+ Value interface{}
+ Type discovery.EventType
+}
+
+type MongoEventFunc func(evt MongoEvent)
+
+type MongoEventHandler interface {
+ Type() string
+ OnEvent(evt MongoEvent)
+}
+
+func NewMongoEventByResource(resource *sdcommon.Resource, action
discovery.EventType) MongoEvent {
+ return MongoEvent{ResourceID: resource.Key, DocumentID:
resource.DocumentID, Value: resource.Value, Type: action}
+}
+
+func NewMongoEvent(id string, documentID string, action discovery.EventType, v
interface{}) MongoEvent {
+ event := MongoEvent{}
+ event.ResourceID = id
+ event.DocumentID = documentID
+ event.Type = action
+ event.Value = v
+ return event
+}
+
+type MongoWatchResponse struct {
+ OperationType string
+ FullDocument bson.Raw
+ DocumentKey MongoDocument
+}
+
+type MongoDocument struct {
+ ID primitive.ObjectID `bson:"_id"`
+}
+
+type ResumeToken struct {
+ Data []byte `bson:"_data"`
+}
+
+type MongoInfo struct {
+ DocumentID string
+ ResourceID string
+ Value interface{}
+}
+
+type Service struct {
+ Domain string
+ Project string
+ Tags map[string]string
+ ServiceInfo *pb.MicroService
+}
+
+type Instance struct {
+ Domain string
+ Project string
+ RefreshTime time.Time
+ InstanceInfo *pb.MicroServiceInstance
+}
diff --git a/datasource/mongo/sd/typestore.go b/datasource/mongo/sd/typestore.go
new file mode 100644
index 0000000..806e6b2
--- /dev/null
+++ b/datasource/mongo/sd/typestore.go
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// kv package provides a TypeStore to manage the implementations of sd
package, see types.go
+package sd
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/config"
+)
+
+var store = &TypeStore{}
+
+func init() {
+ store.Initialize()
+ registerInnerTypes()
+}
+
+type TypeStore struct {
+ caches util.ConcurrentMap
+ ready chan struct{}
+ goroutine *gopool.Pool
+ isClose bool
+}
+
+func (s *TypeStore) Initialize() {
+ s.ready = make(chan struct{})
+ s.goroutine = gopool.New(context.Background())
+}
+
+func registerInnerTypes() {
+ RegisterType(service)
+ RegisterType(instance)
+}
+
+func (s *TypeStore) Run() {
+ s.goroutine.Do(s.store)
+ s.goroutine.Do(s.autoClearCache)
+}
+
+func (s *TypeStore) store(ctx context.Context) {
+ // new all types
+ for _, t := range Types {
+ select {
+ case <-ctx.Done():
+ return
+ case <-s.getOrCreateCache(t).Ready():
+ }
+ }
+ util.SafeCloseChan(s.ready)
+ log.Debug("all caches are ready")
+}
+
+func (s *TypeStore) autoClearCache(ctx context.Context) {
+ ttl := config.GetRegistry().CacheTTL
+
+ if ttl == 0 {
+ return
+ }
+
+ log.Info(fmt.Sprintf("start auto clear cache in %v", ttl))
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(ttl):
+ for _, t := range Types {
+ cache := s.getOrCreateCache(t).Cache()
+ cache.MarkDirty()
+ }
+ log.Warn("caches are marked dirty!")
+ }
+ }
+}
+
+func (s *TypeStore) getOrCreateCache(t string) *MongoCacher {
+ cache, ok := s.caches.Get(t)
+ if ok {
+ return cache.(*MongoCacher)
+ }
+
+ options := DefaultOptions().SetTable(t)
+
+ cache = NewMongoCache(t, options)
+ cacher := NewMongoCacher(options, cache.(*MongoCache))
+ cacher.Run()
+
+ s.caches.Put(t, cacher)
+ return cacher
+}
+
+func (s *TypeStore) Stop() {
+ if s.isClose {
+ return
+ }
+ s.isClose = true
+
+ s.goroutine.Close(true)
+
+ util.SafeCloseChan(s.ready)
+
+ log.Debug("store daemon stopped")
+}
+
+func (s *TypeStore) Ready() <-chan struct{} {
+ return s.ready
+}
+
+func (s *TypeStore) TypeCacher(id string) *MongoCacher { return
s.getOrCreateCache(id) }
+func (s *TypeStore) Service() *MongoCacher { return
s.TypeCacher(service) }
+func (s *TypeStore) Instance() *MongoCacher { return
s.TypeCacher(instance) }
+
+func Store() *TypeStore {
+ return store
+}
diff --git a/datasource/mongo/sd/typestore_test.go
b/datasource/mongo/sd/typestore_test.go
new file mode 100644
index 0000000..11ed78f
--- /dev/null
+++ b/datasource/mongo/sd/typestore_test.go
@@ -0,0 +1,42 @@
+package sd
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestTypeStore_Initialize(t *testing.T) {
+ s := TypeStore{}
+ s.Initialize()
+ assert.NotNil(t, s.ready)
+ assert.NotNil(t, s.goroutine)
+ assert.Equal(t, false, s.isClose)
+}
+
+func TestTypeStore_Ready(t *testing.T) {
+ s := TypeStore{}
+ s.Initialize()
+ c := s.Ready()
+ assert.NotNil(t, c)
+}
+
+func TestTypeStore_Stop(t *testing.T) {
+ t.Run("when closed", func(t *testing.T) {
+ s := TypeStore{
+ isClose: true,
+ }
+ s.Initialize()
+ s.Stop()
+ assert.Equal(t, true, s.isClose)
+ })
+
+ t.Run("when not closed", func(t *testing.T) {
+ s := TypeStore{
+ isClose: false,
+ }
+ s.Initialize()
+ s.Stop()
+ assert.Equal(t, true, s.isClose)
+ })
+}
diff --git a/datasource/etcd/sd/etcd/common.go b/datasource/sdcommon/common.go
similarity index 53%
copy from datasource/etcd/sd/etcd/common.go
copy to datasource/sdcommon/common.go
index 1d9d64e..0d58674 100644
--- a/datasource/etcd/sd/etcd/common.go
+++ b/datasource/sdcommon/common.go
@@ -13,42 +13,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package etcd
+package sdcommon
-import (
- "time"
-
- "github.com/apache/servicecomb-service-center/datasource/etcd"
- "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
- "github.com/apache/servicecomb-service-center/datasource/etcd/value"
- "github.com/coreos/etcd/mvcc/mvccpb"
-)
+import "time"
const (
// force re-list
DefaultForceListInterval = 4
DefaultMetricsInterval = 30 * time.Second
- minWaitInterval = 1 * time.Second
- eventBlockSize = 1000
- eventBusSize = 1000
+ MinWaitInterval = 1 * time.Second
+ EventBlockSize = 1000
+ EventBusSize = 1000
)
-
-var closedCh = make(chan struct{})
-
-func init() {
- close(closedCh)
-}
-
-func FromEtcdKeyValue(dist *sd.KeyValue, src *mvccpb.KeyValue, parser
value.Parser) (err error) {
- dist.ClusterName = etcd.Configuration().ClusterName
- dist.Key = src.Key
- dist.Version = src.Version
- dist.CreateRevision = src.CreateRevision
- dist.ModRevision = src.ModRevision
- if parser == nil {
- return
- }
- dist.Value, err = parser.Unmarshal(src.Value)
- return
-}
diff --git a/datasource/etcd/sd/etcd/watcher_inner.go
b/datasource/sdcommon/eventbus.go
similarity index 70%
rename from datasource/etcd/sd/etcd/watcher_inner.go
rename to datasource/sdcommon/eventbus.go
index 2c964aa..b0845e7 100644
--- a/datasource/etcd/sd/etcd/watcher_inner.go
+++ b/datasource/sdcommon/eventbus.go
@@ -15,36 +15,35 @@
* limitations under the License.
*/
-package etcd
+package sdcommon
import (
"context"
"sync"
- "github.com/apache/servicecomb-service-center/datasource/etcd/client"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
)
-type innerWatcher struct {
+type EventBus struct {
Cfg ListWatchConfig
- lw ListWatch
- bus chan *client.PluginResponse
+ Lw ListWatch
+ Bus chan *ListWatchResp
stopCh chan struct{}
stop bool
mux sync.Mutex
}
-func (w *innerWatcher) EventBus() <-chan *client.PluginResponse {
- return w.bus
+func (w *EventBus) ResourceEventBus() <-chan *ListWatchResp {
+ return w.Bus
}
-func (w *innerWatcher) process(_ context.Context) {
+func (w *EventBus) process(_ context.Context) {
stopCh := make(chan struct{})
ctx, cancel := context.WithTimeout(w.Cfg.Context, w.Cfg.Timeout)
gopool.Go(func(_ context.Context) {
defer close(stopCh)
- _ = w.lw.DoWatch(ctx, w.sendEvent)
+ _ = w.Lw.DoWatch(ctx, w.sendEvent)
})
select {
@@ -58,12 +57,12 @@ func (w *innerWatcher) process(_ context.Context) {
}
-func (w *innerWatcher) sendEvent(resp *client.PluginResponse) {
+func (w *EventBus) sendEvent(resp *ListWatchResp) {
defer log.Recover()
- w.bus <- resp
+ w.Bus <- resp
}
-func (w *innerWatcher) Stop() {
+func (w *EventBus) Stop() {
w.mux.Lock()
if w.stop {
w.mux.Unlock()
@@ -71,15 +70,15 @@ func (w *innerWatcher) Stop() {
}
w.stop = true
close(w.stopCh)
- close(w.bus)
+ close(w.Bus)
w.mux.Unlock()
}
-func newInnerWatcher(lw ListWatch, cfg ListWatchConfig) *innerWatcher {
- w := &innerWatcher{
+func NewEventBus(lw ListWatch, cfg ListWatchConfig) *EventBus {
+ w := &EventBus{
Cfg: cfg,
- lw: lw,
- bus: make(chan *client.PluginResponse, eventBusSize),
+ Lw: lw,
+ Bus: make(chan *ListWatchResp, EventBusSize),
stopCh: make(chan struct{}),
}
gopool.Go(w.process)
diff --git a/datasource/etcd/sd/etcd/listwatch.go
b/datasource/sdcommon/listwatch.go
similarity index 80%
rename from datasource/etcd/sd/etcd/listwatch.go
rename to datasource/sdcommon/listwatch.go
index 8af8e3b..5d39cfd 100644
--- a/datasource/etcd/sd/etcd/listwatch.go
+++ b/datasource/sdcommon/listwatch.go
@@ -15,14 +15,12 @@
* limitations under the License.
*/
-package etcd
+package sdcommon
import (
"context"
"fmt"
"time"
-
- "github.com/apache/servicecomb-service-center/datasource/etcd/client"
)
type ListWatchConfig struct {
@@ -35,10 +33,9 @@ func (lo *ListWatchConfig) String() string {
}
type ListWatch interface {
- List(op ListWatchConfig) (*client.PluginResponse, error)
+ List(op ListWatchConfig) (*ListWatchResp, error)
// not support new multiple watchers
- Watch(op ListWatchConfig) Watcher
- //
- DoWatch(ctx context.Context, f func(*client.PluginResponse)) error
- Revision() int64
+ EventBus(op ListWatchConfig) *EventBus
+
+ DoWatch(ctx context.Context, f func(*ListWatchResp)) error
}
diff --git a/datasource/sdcommon/types.go b/datasource/sdcommon/types.go
new file mode 100644
index 0000000..93e641b
--- /dev/null
+++ b/datasource/sdcommon/types.go
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sdcommon
+
+import (
+ "strconv"
+)
+
+const (
+ ActionCreate ActionType = iota
+ ActionUpdate
+ ActionDelete
+ // ActionPUT may be create or update
+ ActionPUT
+)
+
+type ActionType int
+
+func (at ActionType) String() string {
+ switch at {
+ case ActionCreate:
+ return "CREATE"
+ case ActionUpdate:
+ return "UPDATE"
+ case ActionDelete:
+ return "DELETE"
+ case ActionPUT:
+ return "PUT"
+ default:
+ return "ACTION" + strconv.Itoa(int(at))
+ }
+}
+
+type ListWatchResp struct {
+ Action ActionType
+ // Revision is only for etcd
+ Revision int64
+ // Resources may be list of Instance or Service
+ Resources []*Resource
+}
+
+type Resource struct {
+ // Key in etcd is prefix, in mongo is resourceId
+ Key string
+ // DocumentID is only for mongo
+ DocumentID string
+
+ // this is only for etcd
+ CreateRevision int64
+ ModRevision int64
+ Version int64
+
+ Value interface{}
+}