This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new c25f75f  Mongo初步缓存实现,事件通知实现,用于同步数据给synce (#789)
c25f75f is described below

commit c25f75fe584498ffc285523ab8dc2214c0dffdb5
Author: fuziye01 <[email protected]>
AuthorDate: Tue Dec 29 08:54:20 2020 +0800

    Mongo初步缓存实现,事件通知实现,用于同步数据给synce (#789)
    
    1.缓存用两个map分别存储了, 业务Id -> document   documentId -> 业务Id的缓存
    2.暂时只实现了service和Instance的缓存,用于同步数据给syncer
    3.通过list、watch流程初始化、更新缓存
    4.watch到的数据生成事件,并通知event_handler处理,用于同步数据给syncer
    5.重构etcd的list\watch流程,与mongodb实现统一的接口
    6.UT 修改
---
 datasource/etcd/sd/etcd/cacher_kv.go               | 135 +++---
 datasource/etcd/sd/etcd/cacher_kv_test.go          | 132 +++---
 datasource/etcd/sd/etcd/common.go                  |  25 +-
 datasource/etcd/sd/etcd/listwatch_inner.go         |  45 +-
 datasource/etcd/sd/etcd/listwatch_test.go          |  21 +-
 datasource/etcd/sd/etcd/watcher_test.go            |  87 ----
 datasource/mongo/mongo.go                          |  12 +
 datasource/mongo/sd/cache.go                       |  46 ++
 datasource/mongo/sd/common.go                      |  25 ++
 datasource/mongo/sd/event_proxy.go                 |  63 +++
 datasource/mongo/sd/event_proxy_test.go            |  81 ++++
 datasource/mongo/sd/listwatch_inner.go             | 170 ++++++++
 datasource/mongo/sd/listwatch_test.go              |  87 ++++
 datasource/mongo/sd/mongo_cache.go                 | 137 ++++++
 datasource/mongo/sd/mongo_cacher.go                | 461 +++++++++++++++++++++
 datasource/mongo/sd/mongo_cacher_test.go           | 315 ++++++++++++++
 .../sd/etcd/watcher.go => mongo/sd/options.go}     |  39 +-
 datasource/mongo/sd/options_test.go                |  42 ++
 datasource/mongo/sd/types.go                       | 102 +++++
 datasource/mongo/sd/typestore.go                   | 135 ++++++
 datasource/mongo/sd/typestore_test.go              |  42 ++
 datasource/{etcd/sd/etcd => sdcommon}/common.go    |  36 +-
 .../etcd/watcher_inner.go => sdcommon/eventbus.go} |  33 +-
 datasource/{etcd/sd/etcd => sdcommon}/listwatch.go |  13 +-
 datasource/sdcommon/types.go                       |  69 +++
 25 files changed, 2068 insertions(+), 285 deletions(-)

diff --git a/datasource/etcd/sd/etcd/cacher_kv.go 
b/datasource/etcd/sd/etcd/cacher_kv.go
index a83ad15..9a7fa51 100644
--- a/datasource/etcd/sd/etcd/cacher_kv.go
+++ b/datasource/etcd/sd/etcd/cacher_kv.go
@@ -21,17 +21,18 @@ import (
        "context"
        "errors"
        "fmt"
+       "reflect"
        "sync"
        "time"
 
        "github.com/apache/servicecomb-service-center/datasource/etcd/client"
        "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
        "github.com/apache/servicecomb-service-center/pkg/backoff"
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/config"
-       "github.com/coreos/etcd/mvcc/mvccpb"
        rmodel "github.com/go-chassis/cari/discovery"
 )
 
@@ -47,7 +48,7 @@ type KvCacher struct {
        reListCount int
 
        ready     chan struct{}
-       lw        ListWatch
+       lw        sdcommon.ListWatch
        mux       sync.Mutex
        once      sync.Once
        cache     sd.Cache
@@ -59,42 +60,42 @@ func (c *KvCacher) Config() *sd.Config {
 }
 
 func (c *KvCacher) needList() bool {
-       rev := c.lw.Revision()
+       rev := c.getRevision()
        if rev == 0 {
                c.reListCount = 0
                return true
        }
        c.reListCount++
-       if c.reListCount < DefaultForceListInterval {
+       if c.reListCount < sdcommon.DefaultForceListInterval {
                return false
        }
        c.reListCount = 0
        return true
 }
 
-func (c *KvCacher) doList(cfg ListWatchConfig) error {
+func (c *KvCacher) doList(cfg sdcommon.ListWatchConfig) error {
        resp, err := c.lw.List(cfg)
        if err != nil {
                return err
        }
 
-       rev := c.lw.Revision()
-       kvs := resp.Kvs
+       rev := c.getRevision()
+       resources := resp.Resources
        start := time.Now()
        defer log.DebugOrWarnf(start, "finish to cache key %s, %d items, rev: 
%d",
-               c.Cfg.Key, len(kvs), rev)
+               c.Cfg.Key, len(resources), rev)
 
        // just reset the cacher if cache marked dirty
        if c.cache.Dirty() {
-               c.reset(rev, kvs)
+               c.reset(rev, resources)
                log.Warnf("Cache[%s] is reset!", c.cache.Name())
                return nil
        }
 
        // calc and return the diff between cache and ETCD
-       evts := c.filter(rev, kvs)
+       evts := c.filter(rev, resources)
        // there is no change between List() and cache, then stop the self 
preservation
-       if ec, kc := len(evts), len(kvs); c.Cfg.DeferHandler != nil && ec == 0 
&& kc != 0 &&
+       if ec, kc := len(evts), len(resources); c.Cfg.DeferHandler != nil && ec 
== 0 && kc != 0 &&
                c.Cfg.DeferHandler.Reset() {
                log.Warnf("most of the protected data(%d/%d) are recovered",
                        kc, c.cache.GetAll(nil))
@@ -105,7 +106,7 @@ func (c *KvCacher) doList(cfg ListWatchConfig) error {
        return nil
 }
 
-func (c *KvCacher) reset(rev int64, kvs []*mvccpb.KeyValue) {
+func (c *KvCacher) reset(rev int64, kvs []*sdcommon.Resource) {
        if c.Cfg.DeferHandler != nil {
                c.Cfg.DeferHandler.Reset()
        }
@@ -117,9 +118,9 @@ func (c *KvCacher) reset(rev int64, kvs []*mvccpb.KeyValue) 
{
        c.buildCache(c.filter(rev, kvs))
 }
 
-func (c *KvCacher) doWatch(cfg ListWatchConfig) error {
-       if watcher := c.lw.Watch(cfg); watcher != nil {
-               return c.handleWatcher(watcher)
+func (c *KvCacher) doWatch(cfg sdcommon.ListWatchConfig) error {
+       if eventBus := c.lw.EventBus(cfg); eventBus != nil {
+               return c.handleEventBus(eventBus)
        }
        return fmt.Errorf("handle a nil watcher")
 }
@@ -129,7 +130,7 @@ func (c *KvCacher) ListAndWatch(ctx context.Context) error {
        defer c.mux.Unlock()
        defer log.Recover() // ensure ListAndWatch never raise panic
 
-       cfg := ListWatchConfig{
+       cfg := sdcommon.ListWatchConfig{
                Timeout: c.Cfg.Timeout,
                Context: ctx,
        }
@@ -139,7 +140,7 @@ func (c *KvCacher) ListAndWatch(ctx context.Context) error {
        // 2. Runtime: error occurs in previous watch operation, the lister's 
revision is set to 0.
        // 3. Runtime: watch operation timed out over 
DEFAULT_FORCE_LIST_INTERVAL times.
        if c.needList() {
-               if err := c.doList(cfg); err != nil && (!c.IsReady() || 
c.lw.Revision() == 0) {
+               if err := c.doList(cfg); err != nil && (!c.IsReady() || 
c.getRevision() == 0) {
                        return err // do retry to list etcd
                }
                // keep going to next step:
@@ -153,37 +154,37 @@ func (c *KvCacher) ListAndWatch(ctx context.Context) 
error {
        return c.doWatch(cfg)
 }
 
-func (c *KvCacher) handleWatcher(watcher Watcher) error {
-       defer watcher.Stop()
-       for resp := range watcher.EventBus() {
+func (c *KvCacher) handleEventBus(eventBus *sdcommon.EventBus) error {
+       defer eventBus.Stop()
+       for resp := range eventBus.ResourceEventBus() {
                if resp == nil {
                        return errors.New("handle watcher error")
                }
 
                start := time.Now()
                rev := resp.Revision
-               evts := make([]sd.KvEvent, 0, len(resp.Kvs))
-               for _, kv := range resp.Kvs {
-                       evt := sd.NewKvEvent(rmodel.EVT_CREATE, nil, 
kv.ModRevision)
+               evts := make([]sd.KvEvent, 0, len(resp.Resources))
+               for _, resource := range resp.Resources {
+                       evt := sd.NewKvEvent(rmodel.EVT_CREATE, nil, 
resource.ModRevision)
                        switch {
-                       case resp.Action == client.ActionPut && kv.Version == 1:
-                               evt.Type, evt.KV = rmodel.EVT_CREATE, 
c.doParse(kv)
-                       case resp.Action == client.ActionPut:
-                               evt.Type, evt.KV = rmodel.EVT_UPDATE, 
c.doParse(kv)
-                       case resp.Action == client.ActionDelete:
+                       case resp.Action == sdcommon.ActionPUT && 
resource.Version == 1:
+                               evt.Type, evt.KV = rmodel.EVT_CREATE, 
c.doParse(resource)
+                       case resp.Action == sdcommon.ActionPUT:
+                               evt.Type, evt.KV = rmodel.EVT_UPDATE, 
c.doParse(resource)
+                       case resp.Action == sdcommon.ActionDelete:
                                evt.Type = rmodel.EVT_DELETE
-                               if kv.Value == nil {
+                               if resource.Value == nil {
                                        // it will happen in embed mode, and 
then need to get the cache value not unmarshal
-                                       evt.KV = 
c.cache.Get(util.BytesToStringWithNoCopy(kv.Key))
+                                       evt.KV = c.cache.Get(resource.Key)
                                } else {
-                                       evt.KV = c.doParse(kv)
+                                       evt.KV = c.doParse(resource)
                                }
                        default:
-                               log.Errorf(nil, "unknown KeyValue %v", kv)
+                               log.Errorf(nil, "unknown KeyValue %v", resource)
                                continue
                        }
                        if evt.KV == nil {
-                               log.Errorf(nil, "failed to parse KeyValue %v", 
kv)
+                               log.Errorf(nil, "failed to parse KeyValue %v", 
resource)
                                continue
                        }
                        evts = append(evts, evt)
@@ -207,10 +208,10 @@ func (c *KvCacher) refresh(ctx context.Context) {
        log.Debugf("start to list and watch %s", c.Cfg)
        retries := 0
 
-       timer := time.NewTimer(minWaitInterval)
+       timer := time.NewTimer(sdcommon.MinWaitInterval)
        defer timer.Stop()
        for {
-               nextPeriod := minWaitInterval
+               nextPeriod := sdcommon.MinWaitInterval
                if err := c.ListAndWatch(ctx); err != nil {
                        retries++
                        nextPeriod = backoff.GetBackoff().Delay(retries)
@@ -241,14 +242,14 @@ func (c *KvCacher) sync(evts []sd.KvEvent) {
        c.onEvents(evts)
 }
 
-func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []sd.KvEvent {
+func (c *KvCacher) filter(rev int64, items []*sdcommon.Resource) []sd.KvEvent {
        nc := len(items)
-       newStore := make(map[string]*mvccpb.KeyValue, nc)
+       newStore := make(map[string]*sdcommon.Resource, nc)
        for _, kv := range items {
-               newStore[util.BytesToStringWithNoCopy(kv.Key)] = kv
+               newStore[kv.Key] = kv
        }
        filterStopCh := make(chan struct{})
-       eventsCh := make(chan [eventBlockSize]sd.KvEvent, 2)
+       eventsCh := make(chan [sdcommon.EventBlockSize]sd.KvEvent, 2)
 
        go c.filterDelete(newStore, rev, eventsCh, filterStopCh)
 
@@ -266,9 +267,9 @@ func (c *KvCacher) filter(rev int64, items 
[]*mvccpb.KeyValue) []sd.KvEvent {
        return evts
 }
 
-func (c *KvCacher) filterDelete(newStore map[string]*mvccpb.KeyValue,
-       rev int64, eventsCh chan [eventBlockSize]sd.KvEvent, filterStopCh chan 
struct{}) {
-       var block [eventBlockSize]sd.KvEvent
+func (c *KvCacher) filterDelete(newStore map[string]*sdcommon.Resource,
+       rev int64, eventsCh chan [sdcommon.EventBlockSize]sd.KvEvent, 
filterStopCh chan struct{}) {
+       var block [sdcommon.EventBlockSize]sd.KvEvent
        i := 0
 
        c.cache.ForEach(func(k string, v *sd.KeyValue) (next bool) {
@@ -279,9 +280,9 @@ func (c *KvCacher) filterDelete(newStore 
map[string]*mvccpb.KeyValue,
                        return
                }
 
-               if i >= eventBlockSize {
+               if i >= sdcommon.EventBlockSize {
                        eventsCh <- block
-                       block = [eventBlockSize]sd.KvEvent{}
+                       block = [sdcommon.EventBlockSize]sd.KvEvent{}
                        i = 0
                }
 
@@ -297,17 +298,17 @@ func (c *KvCacher) filterDelete(newStore 
map[string]*mvccpb.KeyValue,
        close(filterStopCh)
 }
 
-func (c *KvCacher) filterCreateOrUpdate(newStore map[string]*mvccpb.KeyValue,
-       rev int64, eventsCh chan [eventBlockSize]sd.KvEvent, filterStopCh chan 
struct{}) {
-       var block [eventBlockSize]sd.KvEvent
+func (c *KvCacher) filterCreateOrUpdate(newStore map[string]*sdcommon.Resource,
+       rev int64, eventsCh chan [sdcommon.EventBlockSize]sd.KvEvent, 
filterStopCh chan struct{}) {
+       var block [sdcommon.EventBlockSize]sd.KvEvent
        i := 0
 
        for k, v := range newStore {
                ov := c.cache.Get(k)
                if ov == nil {
-                       if i >= eventBlockSize {
+                       if i >= sdcommon.EventBlockSize {
                                eventsCh <- block
-                               block = [eventBlockSize]sd.KvEvent{}
+                               block = [sdcommon.EventBlockSize]sd.KvEvent{}
                                i = 0
                        }
 
@@ -322,9 +323,9 @@ func (c *KvCacher) filterCreateOrUpdate(newStore 
map[string]*mvccpb.KeyValue,
                        continue
                }
 
-               if i >= eventBlockSize {
+               if i >= sdcommon.EventBlockSize {
                        eventsCh <- block
-                       block = [eventBlockSize]sd.KvEvent{}
+                       block = [sdcommon.EventBlockSize]sd.KvEvent{}
                        i = 0
                }
 
@@ -360,7 +361,7 @@ func (c *KvCacher) deferHandle(ctx context.Context) {
 func (c *KvCacher) handleDeferEvents(ctx context.Context) {
        defer log.Recover()
        var (
-               evts = make([]sd.KvEvent, eventBlockSize)
+               evts = make([]sd.KvEvent, sdcommon.EventBlockSize)
                i    int
        )
        interval := 300 * time.Millisecond
@@ -375,9 +376,9 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
                                return
                        }
 
-                       if i >= eventBlockSize {
+                       if i >= sdcommon.EventBlockSize {
                                c.onEvents(evts[:i])
-                               evts = make([]sd.KvEvent, eventBlockSize)
+                               evts = make([]sd.KvEvent, 
sdcommon.EventBlockSize)
                                i = 0
                        }
 
@@ -393,7 +394,7 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
                        }
 
                        c.onEvents(evts[:i])
-                       evts = make([]sd.KvEvent, eventBlockSize)
+                       evts = make([]sd.KvEvent, sdcommon.EventBlockSize)
                        i = 0
                }
        }
@@ -453,10 +454,10 @@ func (c *KvCacher) notify(evts []sd.KvEvent) {
        }
 }
 
-func (c *KvCacher) doParse(src *mvccpb.KeyValue) (kv *sd.KeyValue) {
+func (c *KvCacher) doParse(src *sdcommon.Resource) (kv *sd.KeyValue) {
        kv = sd.NewKeyValue()
-       if err := FromEtcdKeyValue(kv, src, c.Cfg.Parser); err != nil {
-               log.Errorf(err, "parse %s value failed", 
util.BytesToStringWithNoCopy(src.Key))
+       if err := ParseResourceToEtcdKeyValue(kv, src, c.Cfg.Parser); err != 
nil {
+               log.Errorf(err, "parse %s value failed", src.Key)
                return nil
        }
        return
@@ -497,7 +498,7 @@ func (c *KvCacher) reportMetrics(ctx context.Context) {
        if !config.GetServer().EnablePProf {
                return
        }
-       timer := time.NewTimer(DefaultMetricsInterval)
+       timer := time.NewTimer(sdcommon.DefaultMetricsInterval)
        defer timer.Stop()
        for {
                select {
@@ -505,7 +506,7 @@ func (c *KvCacher) reportMetrics(ctx context.Context) {
                        return
                case <-timer.C:
                        ReportCacheSize(c.cache.Name(), "raw", c.cache.Size())
-                       timer.Reset(DefaultMetricsInterval)
+                       timer.Reset(sdcommon.DefaultMetricsInterval)
                }
        }
 }
@@ -522,3 +523,17 @@ func NewKvCacher(cfg *sd.Config, cache sd.Cache) *KvCacher 
{
                goroutine: gopool.New(context.Background()),
        }
 }
+
+func (c *KvCacher) getRevision() int64 {
+       lw := reflect.ValueOf(c.lw)
+       revMethod := lw.MethodByName(revMethodName)
+
+       if !revMethod.IsValid() {
+               log.Warn("get revision failed")
+               return 0
+       }
+
+       revs := revMethod.Call(make([]reflect.Value, 0))
+       rev := revs[0].Int()
+       return rev
+}
diff --git a/datasource/etcd/sd/etcd/cacher_kv_test.go 
b/datasource/etcd/sd/etcd/cacher_kv_test.go
index 2604435..c715aed 100644
--- a/datasource/etcd/sd/etcd/cacher_kv_test.go
+++ b/datasource/etcd/sd/etcd/cacher_kv_test.go
@@ -25,12 +25,11 @@ import (
        "strconv"
        "testing"
 
-       "github.com/apache/servicecomb-service-center/datasource/etcd/client"
        "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
        "github.com/apache/servicecomb-service-center/datasource/etcd/value"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/util"
-       "github.com/coreos/etcd/mvcc/mvccpb"
        pb "github.com/go-chassis/cari/discovery"
 )
 
@@ -74,12 +73,42 @@ func (n *mockCache) MarkDirty()  {}
 func (n *mockCache) Dirty() bool { return false }
 func (n *mockCache) Clear()      {}
 
-func TestNewKvCacher(t *testing.T) {
-       w := &mockWatcher{}
-       lw := &mockListWatch{
-               Bus: make(chan *client.PluginResponse, 100),
+type mockListWatch struct {
+       ListResponse  *sdcommon.ListWatchResp
+       WatchResponse *sdcommon.ListWatchResp
+       Rev           int64
+}
+
+func (lw *mockListWatch) List(sdcommon.ListWatchConfig) 
(*sdcommon.ListWatchResp, error) {
+       if lw.ListResponse == nil {
+               return nil, fmt.Errorf("list error")
+       }
+       lw.Rev = lw.ListResponse.Revision
+       return lw.ListResponse, nil
+}
+
+func (lw *mockListWatch) DoWatch(ctx context.Context, f 
func(*sdcommon.ListWatchResp)) error {
+       if lw.WatchResponse == nil {
+               return fmt.Errorf("error")
+       }
+       if len(lw.WatchResponse.Resources) > 0 {
+               lw.Rev = lw.WatchResponse.Resources[0].ModRevision
        }
-       w.lw = lw
+       f(lw.WatchResponse)
+       <-ctx.Done()
+       return nil
+}
+
+func (lw *mockListWatch) EventBus(op sdcommon.ListWatchConfig) 
*sdcommon.EventBus {
+       return sdcommon.NewEventBus(lw, op)
+}
+
+func (lw *mockListWatch) Revision() int64 {
+       return lw.Rev
+}
+
+func TestNewKvCacher(t *testing.T) {
+       lw := &mockListWatch{}
 
        cr := &KvCacher{
                Cfg:       sd.Configure(),
@@ -111,17 +140,17 @@ func TestNewKvCacher(t *testing.T) {
                cache:     &mockCache{},
        }
 
-       lw.Watcher = w
-       data := &mvccpb.KeyValue{Key: []byte("ka"), Value: []byte("va"), 
Version: 1, ModRevision: 2}
-       test := &client.PluginResponse{
-               Action:   client.ActionPut,
-               Revision: 3,
-               Kvs:      []*mvccpb.KeyValue{data}}
+       data := &sdcommon.Resource{Key: "ka", Value: []byte("va"), Version: 1, 
ModRevision: 2}
 
+       test := &sdcommon.ListWatchResp{
+               Action:    sdcommon.ActionPUT,
+               Revision:  3,
+               Resources: []*sdcommon.Resource{data},
+       }
        // case: list 1 resp and watch 0 event
        cr.cache.Remove("ka")
        lw.ListResponse = test
-       lw.Bus <- nil
+       lw.WatchResponse = nil
 
        cr.refresh(ctx)
        // check ready
@@ -142,7 +171,7 @@ func TestNewKvCacher(t *testing.T) {
        data.ModRevision = 3
 
        // case: re-list and should be no event
-       lw.Bus <- nil
+       lw.WatchResponse = nil
        evt.KV = nil
        cr.refresh(ctx)
        if evt.KV != nil {
@@ -155,11 +184,11 @@ func TestNewKvCacher(t *testing.T) {
        }
 
        // case re-list and over no event times
-       for i := 0; i < DefaultForceListInterval; i++ {
-               lw.Bus <- nil
+       for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+               lw.WatchResponse = nil
        }
        evt.KV = nil
-       for i := 0; i < DefaultForceListInterval; i++ {
+       for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
                cr.refresh(ctx)
        }
        // check event
@@ -172,12 +201,12 @@ func TestNewKvCacher(t *testing.T) {
                t.Fatalf("TestNewKvCacher failed")
        }
 
-       lw.ListResponse = &client.PluginResponse{Revision: 5}
-       for i := 0; i < DefaultForceListInterval; i++ {
-               lw.Bus <- nil
+       lw.ListResponse = &sdcommon.ListWatchResp{Revision: 5}
+       for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+               lw.WatchResponse = nil
        }
        evt.KV = nil
-       for i := 0; i < DefaultForceListInterval; i++ {
+       for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
                cr.refresh(ctx)
        }
        // check event
@@ -193,8 +222,8 @@ func TestNewKvCacher(t *testing.T) {
        // case: no list and watch 1 event
        test.Revision = 6
        data.ModRevision = 5
-       lw.Bus <- test
-       lw.Bus <- nil
+       lw.WatchResponse = test
+       lw.ListResponse = nil
 
        cr.refresh(ctx)
        // check event
@@ -210,8 +239,8 @@ func TestNewKvCacher(t *testing.T) {
        test.Revision = 7
        data.Version = 2
        data.ModRevision = 6
-       lw.Bus <- test
-       lw.Bus <- nil
+       lw.WatchResponse = test
+       lw.ListResponse = nil
 
        cr.refresh(ctx)
        // check event
@@ -225,11 +254,11 @@ func TestNewKvCacher(t *testing.T) {
        }
 
        test.Revision = 8
-       test.Action = client.ActionDelete
+       test.Action = sdcommon.ActionDelete
        data.Version = 0
        data.ModRevision = 6
-       lw.Bus <- test
-       lw.Bus <- nil
+       lw.WatchResponse = test
+       lw.ListResponse = nil
 
        cr.refresh(ctx)
        // check event
@@ -248,7 +277,7 @@ func TestNewKvCacher(t *testing.T) {
        data.Version = 1
        data.ModRevision = 1
        lw.ListResponse = test
-       lw.Bus <- nil
+       lw.WatchResponse = nil
        evt.KV = nil
        cr.refresh(ctx)
        // check event
@@ -263,12 +292,12 @@ func TestNewKvCacher(t *testing.T) {
 
        // case: caught delete event but value is nil
        test.Revision = 10
-       test.Action = client.ActionDelete
+       test.Action = sdcommon.ActionDelete
        data.Version = 0
        data.ModRevision = 1
        data.Value = nil
-       lw.Bus <- test
-       lw.Bus <- nil
+       lw.WatchResponse = test
+       lw.ListResponse = nil
 
        cr.refresh(ctx)
        data.Value = []byte("va")
@@ -288,7 +317,7 @@ func TestNewKvCacher(t *testing.T) {
        data.Version = 1
        data.ModRevision = 1
        lw.ListResponse = test
-       lw.Bus <- nil
+       lw.WatchResponse = nil
        evt.KV = nil
        old := *cr.Cfg
        cr.Cfg.WithParser(value.MapParser)
@@ -304,8 +333,8 @@ func TestNewKvCacher(t *testing.T) {
        }
 
        lw.ListResponse = test
-       lw.Bus <- test
-       lw.Bus <- nil
+       lw.WatchResponse = test
+
        cr.refresh(ctx)
        *cr.Cfg = old
        // check event
@@ -322,19 +351,19 @@ func TestNewKvCacher(t *testing.T) {
        var evts = make(map[string]sd.KvEvent)
        lw.Rev = 0
        test.Revision = 3
-       test.Kvs = nil
-       for i := 0; i < eventBlockSize+1; i++ {
+       test.Resources = nil
+       for i := 0; i < sdcommon.EventBlockSize+1; i++ {
                kv := *data
-               kv.Key = []byte(strconv.Itoa(i))
+               kv.Key = strconv.Itoa(i)
                kv.Value = []byte(strconv.Itoa(i))
                kv.Version = int64(i)
                kv.ModRevision = int64(i)
-               test.Kvs = append(test.Kvs, &kv)
+               test.Resources = append(test.Resources, &kv)
        }
        data.ModRevision = 2
-       test.Kvs = append(test.Kvs, data)
+       test.Resources = append(test.Resources, data)
        lw.ListResponse = test
-       lw.Bus <- nil
+       lw.WatchResponse = nil
        evt.KV = nil
        old = *cr.Cfg
        cr.Cfg.WithEventFunc(func(evt sd.KvEvent) {
@@ -343,18 +372,18 @@ func TestNewKvCacher(t *testing.T) {
        cr.refresh(ctx)
        *cr.Cfg = old
        // check all events
-       for i := 0; i < eventBlockSize+1; i++ {
+       for i := 0; i < sdcommon.EventBlockSize+1; i++ {
                s := strconv.Itoa(i)
                if evt, ok := evts[s]; !ok || evt.Type != pb.EVT_CREATE || 
evt.KV.ModRevision != int64(i) || string(evt.KV.Value.([]byte)) != s {
                        t.Fatalf("TestNewKvCacher failed, %v", evt)
                }
                delete(evts, s)
        }
-       evt = evts[string(data.Key)]
+       evt = evts[data.Key]
        if len(evts) != 1 || evt.Type != pb.EVT_CREATE || evt.Revision != 3 || 
evt.KV.ModRevision != 2 {
                t.Fatalf("TestNewKvCacher failed, %v %v", evts, evt)
        }
-       delete(evts, string(data.Key))
+       delete(evts, data.Key)
 
        // case: cacher is ready and the next list failed, prevent to watch 
with rev = 0
        if !cr.IsReady() {
@@ -362,14 +391,13 @@ func TestNewKvCacher(t *testing.T) {
        }
        lw.Rev = 0            // watch failed
        lw.ListResponse = nil // the next list
-       lw.Bus <- test
+       lw.WatchResponse = test
        old = *cr.Cfg
        cr.Cfg.WithEventFunc(func(evt sd.KvEvent) {
                t.Fatalf("TestNewKvCacher failed, %v", evt)
        })
        cr.refresh(ctx)
        *cr.Cfg = old
-       <-lw.Bus
 }
 
 func BenchmarkFilter(b *testing.B) {
@@ -385,13 +413,13 @@ func BenchmarkFilter(b *testing.B) {
 
        n := 300 * 1000 // 30w
        cache := sd.NewKvCache("test", cfg)
-       items := make([]*mvccpb.KeyValue, 0, n)
+       items := make([]*sdcommon.Resource, 0, n)
        for ; n > 0; n-- {
                k := fmt.Sprintf("/%d", n)
                if n <= 10*1000 {
                        // create
-                       items = append(items, &mvccpb.KeyValue{
-                               Key:         util.StringToBytesWithNoCopy(k),
+                       items = append(items, &sdcommon.Resource{
+                               Key:         k,
                                Value:       v,
                                ModRevision: int64(rand.Int()),
                        })
@@ -402,8 +430,8 @@ func BenchmarkFilter(b *testing.B) {
                                Value:       inst,
                                ModRevision: 1,
                        })
-                       items = append(items, &mvccpb.KeyValue{
-                               Key:         util.StringToBytesWithNoCopy(k),
+                       items = append(items, &sdcommon.Resource{
+                               Key:         k,
                                Value:       v,
                                ModRevision: int64(rand.Int()),
                        })
diff --git a/datasource/etcd/sd/etcd/common.go 
b/datasource/etcd/sd/etcd/common.go
index 1d9d64e..c603275 100644
--- a/datasource/etcd/sd/etcd/common.go
+++ b/datasource/etcd/sd/etcd/common.go
@@ -16,22 +16,16 @@
 package etcd
 
 import (
-       "time"
-
        "github.com/apache/servicecomb-service-center/datasource/etcd"
        "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
        "github.com/apache/servicecomb-service-center/datasource/etcd/value"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+       "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/coreos/etcd/mvcc/mvccpb"
 )
 
 const (
-       // force re-list
-       DefaultForceListInterval = 4
-       DefaultMetricsInterval   = 30 * time.Second
-
-       minWaitInterval = 1 * time.Second
-       eventBlockSize  = 1000
-       eventBusSize    = 1000
+       revMethodName = "Revision"
 )
 
 var closedCh = make(chan struct{})
@@ -52,3 +46,16 @@ func FromEtcdKeyValue(dist *sd.KeyValue, src 
*mvccpb.KeyValue, parser value.Pars
        dist.Value, err = parser.Unmarshal(src.Value)
        return
 }
+
+func ParseResourceToEtcdKeyValue(dist *sd.KeyValue, src *sdcommon.Resource, 
parser value.Parser) (err error) {
+       dist.ClusterName = etcd.Configuration().ClusterName
+       dist.Key = util.StringToBytesWithNoCopy(src.Key)
+       dist.Version = src.Version
+       dist.CreateRevision = src.CreateRevision
+       dist.ModRevision = src.ModRevision
+       if parser == nil {
+               return
+       }
+       dist.Value, err = parser.Unmarshal(src.Value.([]byte))
+       return
+}
diff --git a/datasource/etcd/sd/etcd/listwatch_inner.go 
b/datasource/etcd/sd/etcd/listwatch_inner.go
index cb477bf..e2ecdc5 100644
--- a/datasource/etcd/sd/etcd/listwatch_inner.go
+++ b/datasource/etcd/sd/etcd/listwatch_inner.go
@@ -22,7 +22,9 @@ import (
        "fmt"
 
        "github.com/apache/servicecomb-service-center/datasource/etcd/client"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
        "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
 type innerListWatch struct {
@@ -32,7 +34,7 @@ type innerListWatch struct {
        rev int64
 }
 
-func (lw *innerListWatch) List(op ListWatchConfig) (*client.PluginResponse, 
error) {
+func (lw *innerListWatch) List(op sdcommon.ListWatchConfig) 
(*sdcommon.ListWatchResp, error) {
        otCtx, cancel := context.WithTimeout(op.Context, op.Timeout)
        defer cancel()
        resp, err := lw.Client.Do(otCtx, 
client.WatchPrefixOpOptions(lw.Prefix)...)
@@ -41,7 +43,10 @@ func (lw *innerListWatch) List(op ListWatchConfig) 
(*client.PluginResponse, erro
                return nil, err
        }
        lw.setRevision(resp.Revision)
-       return resp, nil
+
+       lwRsp := lw.doParsePluginRspToLwRsp(resp)
+
+       return lwRsp, nil
 }
 
 func (lw *innerListWatch) Revision() int64 {
@@ -52,11 +57,11 @@ func (lw *innerListWatch) setRevision(rev int64) {
        lw.rev = rev
 }
 
-func (lw *innerListWatch) Watch(op ListWatchConfig) Watcher {
-       return newInnerWatcher(lw, op)
+func (lw *innerListWatch) EventBus(op sdcommon.ListWatchConfig) 
*sdcommon.EventBus {
+       return sdcommon.NewEventBus(lw, op)
 }
 
-func (lw *innerListWatch) DoWatch(ctx context.Context, f 
func(*client.PluginResponse)) error {
+func (lw *innerListWatch) DoWatch(ctx context.Context, f 
func(*sdcommon.ListWatchResp)) error {
        rev := lw.Revision()
        opts := append(
                client.WatchPrefixOpOptions(lw.Prefix),
@@ -69,7 +74,17 @@ func (lw *innerListWatch) DoWatch(ctx context.Context, f 
func(*client.PluginResp
 
                                lw.setRevision(resp.Revision)
 
-                               f(resp)
+                               lwRsp := lw.doParsePluginRspToLwRsp(resp)
+                               switch resp.Action {
+                               case client.ActionPut:
+                                       lwRsp.Action = sdcommon.ActionPUT
+                               case client.ActionDelete:
+                                       lwRsp.Action = sdcommon.ActionDelete
+                               default:
+                                       log.Warn(fmt.Sprintf("unrecognized 
action::%s", lwRsp.Action))
+                               }
+
+                               f(lwRsp)
                                return nil
                        }))
 
@@ -82,3 +97,21 @@ func (lw *innerListWatch) DoWatch(ctx context.Context, f 
func(*client.PluginResp
        }
        return err
 }
+
+func (lw *innerListWatch) doParsePluginRspToLwRsp(pluginRsp 
*client.PluginResponse) *sdcommon.ListWatchResp {
+       lwRsp := &sdcommon.ListWatchResp{}
+
+       lwRsp.Revision = pluginRsp.Revision
+
+       for _, kv := range pluginRsp.Kvs {
+               resource := sdcommon.Resource{}
+               resource.Key = util.BytesToStringWithNoCopy(kv.Key)
+               resource.ModRevision = kv.ModRevision
+               resource.CreateRevision = kv.CreateRevision
+               resource.Version = kv.Version
+               resource.Value = kv.Value
+
+               lwRsp.Resources = append(lwRsp.Resources, &resource)
+       }
+       return lwRsp
+}
diff --git a/datasource/etcd/sd/etcd/listwatch_test.go 
b/datasource/etcd/sd/etcd/listwatch_test.go
index 4bf3b5d..964cb84 100644
--- a/datasource/etcd/sd/etcd/listwatch_test.go
+++ b/datasource/etcd/sd/etcd/listwatch_test.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/servicecomb-service-center/datasource/etcd/client"
        
"github.com/apache/servicecomb-service-center/datasource/etcd/client/buildin"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
        "github.com/coreos/etcd/mvcc/mvccpb"
 )
 
@@ -63,12 +64,12 @@ func TestPrefixListWatch(t *testing.T) {
                Prefix: "a",
                rev:    1,
        }
-       resp, err := lw.List(ListWatchConfig{Timeout: time.Second, Context: 
context.Background()})
+       resp, err := lw.List(sdcommon.ListWatchConfig{Timeout: time.Second, 
Context: context.Background()})
        if resp != nil || err == nil || lw.Revision() != 1 {
                t.Fatalf("TestPrefixListWatch failed")
        }
-       w := lw.Watch(ListWatchConfig{Timeout: time.Second, Context: 
context.Background()})
-       resp = <-w.EventBus()
+       w := lw.EventBus(sdcommon.ListWatchConfig{Timeout: time.Second, 
Context: context.Background()})
+       resp = <-w.ResourceEventBus()
        if resp != nil || lw.Revision() != 0 {
                t.Fatalf("TestPrefixListWatch failed")
        }
@@ -82,12 +83,12 @@ func TestPrefixListWatch(t *testing.T) {
                Prefix: "a",
                rev:    1,
        }
-       resp, err = lw.List(ListWatchConfig{Timeout: time.Second, Context: 
context.Background()})
+       resp, err = lw.List(sdcommon.ListWatchConfig{Timeout: time.Second, 
Context: context.Background()})
        if resp == nil || err != nil || lw.Revision() != 2 {
                t.Fatalf("TestPrefixListWatch failed")
        }
-       w = lw.Watch(ListWatchConfig{Timeout: time.Second, Context: 
context.Background()})
-       resp = <-w.EventBus()
+       w = lw.EventBus(sdcommon.ListWatchConfig{Timeout: time.Second, Context: 
context.Background()})
+       resp = <-w.ResourceEventBus()
        if resp != nil || lw.Revision() != 0 {
                t.Fatalf("TestPrefixListWatch failed")
        }
@@ -102,12 +103,12 @@ func TestPrefixListWatch(t *testing.T) {
                Prefix: "a",
                rev:    1,
        }
-       resp, err = lw.List(ListWatchConfig{Timeout: time.Second, Context: 
context.Background()})
+       resp, err = lw.List(sdcommon.ListWatchConfig{Timeout: time.Second, 
Context: context.Background()})
        if resp == nil || err != nil || lw.Revision() != 4 {
                t.Fatalf("TestPrefixListWatch failed")
        }
-       w = lw.Watch(ListWatchConfig{Timeout: time.Second, Context: 
context.Background()})
-       resp = <-w.EventBus()
+       w = lw.EventBus(sdcommon.ListWatchConfig{Timeout: time.Second, Context: 
context.Background()})
+       resp = <-w.ResourceEventBus()
        if resp == nil || lw.Revision() != 3 {
                t.Fatalf("TestPrefixListWatch failed")
        }
@@ -115,7 +116,7 @@ func TestPrefixListWatch(t *testing.T) {
 }
 
 func TestListWatchConfig_String(t *testing.T) {
-       lw := ListWatchConfig{Timeout: time.Second, Context: 
context.Background()}
+       lw := sdcommon.ListWatchConfig{Timeout: time.Second, Context: 
context.Background()}
        if lw.String() != "{timeout: 1s}" {
                t.Fatalf("TestListWatchConfig_String failed")
        }
diff --git a/datasource/etcd/sd/etcd/watcher_test.go 
b/datasource/etcd/sd/etcd/watcher_test.go
deleted file mode 100644
index 534b5f1..0000000
--- a/datasource/etcd/sd/etcd/watcher_test.go
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package etcd
-
-import (
-       "context"
-       "fmt"
-       "testing"
-       "time"
-
-       "github.com/apache/servicecomb-service-center/datasource/etcd/client"
-)
-
-type mockWatcher struct {
-       lw *mockListWatch
-}
-
-func (w *mockWatcher) EventBus() <-chan *client.PluginResponse {
-       return w.lw.Bus
-}
-func (w *mockWatcher) Stop() {
-}
-
-type mockListWatch struct {
-       ListResponse *client.PluginResponse
-       Bus          chan *client.PluginResponse
-       Watcher      Watcher
-       Rev          int64
-}
-
-func (lw *mockListWatch) List(op ListWatchConfig) (*client.PluginResponse, 
error) {
-       if lw.ListResponse == nil {
-               return nil, fmt.Errorf("error")
-       }
-       lw.Rev = lw.ListResponse.Revision
-       return lw.ListResponse, nil
-}
-func (lw *mockListWatch) DoWatch(ctx context.Context, f 
func(*client.PluginResponse)) error {
-       if lw.ListResponse == nil {
-               return fmt.Errorf("error")
-       }
-       if len(lw.ListResponse.Kvs) > 0 {
-               lw.Rev = lw.ListResponse.Kvs[0].ModRevision
-       }
-       f(lw.ListResponse)
-       <-ctx.Done()
-       return nil
-}
-func (lw *mockListWatch) Watch(op ListWatchConfig) Watcher {
-       return lw.Watcher
-}
-func (lw *mockListWatch) Revision() int64 {
-       return lw.Rev
-}
-
-func TestInnerWatcher_EventBus(t *testing.T) {
-       w := newInnerWatcher(&mockListWatch{}, ListWatchConfig{Timeout: 
time.Second, Context: context.Background()})
-       resp := <-w.EventBus()
-       if resp != nil {
-               t.Fatalf("TestInnerWatcher_EventBus failed")
-       }
-       w.Stop()
-
-       test := &client.PluginResponse{
-               Action: client.ActionPut,
-       }
-       w = newInnerWatcher(&mockListWatch{ListResponse: test}, 
ListWatchConfig{Timeout: time.Second, Context: context.Background()})
-       resp = <-w.EventBus()
-       if resp == nil || resp.Action != client.ActionPut {
-               t.Fatalf("TestInnerWatcher_EventBus failed")
-       }
-       w.Stop()
-}
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index fb607cc..1e84d84 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -23,6 +23,7 @@ import (
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/datasource/mongo/client"
        
"github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat"
+       "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/server/config"
        "github.com/go-chassis/go-chassis/v2/storage"
@@ -72,6 +73,8 @@ func (ds *DataSource) initialize() error {
        if err != nil {
                return err
        }
+       // init mongo cache
+       ds.initStore()
        return nil
 }
 
@@ -118,3 +121,12 @@ func (ds *DataSource) createIndexes() (err error) {
        }
        return
 }
+
+func (ds *DataSource) initStore() {
+       if !config.GetRegistry().EnableCache {
+               log.Debug("cache is disabled")
+               return
+       }
+       sd.Store().Run()
+       <-sd.Store().Ready()
+}
diff --git a/datasource/mongo/sd/cache.go b/datasource/mongo/sd/cache.go
new file mode 100644
index 0000000..9819041
--- /dev/null
+++ b/datasource/mongo/sd/cache.go
@@ -0,0 +1,46 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+// Cache stores db data.
+type Cache interface {
+       CacheReader
+
+       Put(id string, v interface{})
+
+       Remove(id string)
+
+       MarkDirty()
+
+       Dirty() bool
+
+       Clear()
+}
+
+// CacheReader reads k-v data.
+type CacheReader interface {
+       Name() string // The name of implementation
+
+       Size() int // the bytes size of the cache
+
+       // Get gets a value by id
+       Get(id string) interface{}
+
+       // ForEach executes the given function for each of the k-v
+       ForEach(iter func(k string, v interface{}) (next bool))
+}
diff --git a/datasource/mongo/sd/common.go b/datasource/mongo/sd/common.go
new file mode 100644
index 0000000..e2c5bf1
--- /dev/null
+++ b/datasource/mongo/sd/common.go
@@ -0,0 +1,25 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+const (
+       insertOp  = "insert"
+       updateOp  = "update"
+       replaceOp = "replace"
+       deleteOp  = "delete"
+)
diff --git a/datasource/mongo/sd/event_proxy.go 
b/datasource/mongo/sd/event_proxy.go
new file mode 100644
index 0000000..227d1bd
--- /dev/null
+++ b/datasource/mongo/sd/event_proxy.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+       "fmt"
+       "sync"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+var (
+       eventProxies = &sync.Map{}
+)
+
+type MongoEventProxy struct {
+       evtHandleFuncs []MongoEventFunc
+       lock           sync.RWMutex
+}
+
+func (h *MongoEventProxy) AddHandleFunc(f MongoEventFunc) {
+       h.lock.Lock()
+       h.evtHandleFuncs = append(h.evtHandleFuncs, f)
+       h.lock.Unlock()
+}
+
+func (h *MongoEventProxy) OnEvent(evt MongoEvent) {
+       h.lock.RLock()
+       for _, f := range h.evtHandleFuncs {
+               f(evt)
+       }
+       h.lock.RUnlock()
+}
+
+func EventProxy(t string) *MongoEventProxy {
+       proxy, ok := eventProxies.Load(t)
+       if !ok {
+               proxy = &MongoEventProxy{}
+               eventProxies.Store(t, proxy)
+       }
+       return proxy.(*MongoEventProxy)
+}
+
+func AddEventHandler(h MongoEventHandler) {
+       EventProxy(h.Type()).AddHandleFunc(h.OnEvent)
+       log.Info(fmt.Sprintf("register event handler[%s] %s", h.Type(), 
util.Reflect(h).Name()))
+}
diff --git a/datasource/mongo/sd/event_proxy_test.go 
b/datasource/mongo/sd/event_proxy_test.go
new file mode 100644
index 0000000..3119cc9
--- /dev/null
+++ b/datasource/mongo/sd/event_proxy_test.go
@@ -0,0 +1,81 @@
+package sd
+
+import (
+       "sync"
+       "testing"
+
+       "github.com/go-chassis/cari/discovery"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestAddHandleFuncAndOnEvent(t *testing.T) {
+       var funcs []MongoEventFunc
+       mongoEventProxy := MongoEventProxy{
+               evtHandleFuncs: funcs,
+       }
+       mongoEvent := MongoEvent{
+               DocumentID: "",
+               ResourceID: "",
+               Type:       discovery.EVT_CREATE,
+               Value:      1,
+       }
+       mongoEventProxy.evtHandleFuncs = funcs
+       assert.Equal(t, 0, len(mongoEventProxy.evtHandleFuncs),
+               "size of evtHandleFuncs is zero")
+       t.Run("AddHandleFunc one time", func(t *testing.T) {
+               mongoEventProxy.AddHandleFunc(mongoEventFuncGet())
+               mongoEventProxy.OnEvent(mongoEvent)
+               assert.Equal(t, 1, len(mongoEventProxy.evtHandleFuncs))
+       })
+       t.Run("AddHandleFunc three times", func(t *testing.T) {
+               for i := 0; i < 5; i++ {
+                       mongoEventProxy.AddHandleFunc(mongoEventFuncGet())
+                       mongoEventProxy.OnEvent(mongoEvent)
+               }
+               assert.Equal(t, 6, len(mongoEventProxy.evtHandleFuncs))
+       })
+}
+
+type mockInstanceEventHandler struct {
+}
+
+func (h *mockInstanceEventHandler) Type() string {
+       return instance
+}
+
+func (h *mockInstanceEventHandler) OnEvent(MongoEvent) {
+
+}
+
+func TestAddEventHandler(t *testing.T) {
+       AddEventHandler(&mockInstanceEventHandler{})
+
+}
+
+func TestEventProxy(t *testing.T) {
+       t.Run("when there is no such a proxy in eventProxies", func(t 
*testing.T) {
+               eventProxies = &sync.Map{}
+               proxy := EventProxy("new")
+               p, ok := eventProxies.Load("new")
+
+               assert.Equal(t, true, ok)
+               assert.NotNil(t, p, "proxy is not nil")
+               assert.Nil(t, proxy.evtHandleFuncs)
+       })
+
+       t.Run("when there is no such a proxy in eventProxies", func(t 
*testing.T) {
+               eventProxies = &sync.Map{}
+               mongoEventFunc := []MongoEventFunc{mongoEventFuncGet()}
+               mongoEventProxy := MongoEventProxy{
+                       evtHandleFuncs: mongoEventFunc,
+               }
+               eventProxies.Store("a", &mongoEventProxy)
+               proxy := EventProxy("a")
+
+               p, ok := eventProxies.Load("a")
+               assert.Equal(t, true, ok)
+               assert.Equal(t, &mongoEventProxy, p)
+               assert.NotNil(t, p, "proxy is not nil")
+               assert.NotNil(t, proxy.evtHandleFuncs)
+       })
+}
diff --git a/datasource/mongo/sd/listwatch_inner.go 
b/datasource/mongo/sd/listwatch_inner.go
new file mode 100644
index 0000000..2bee2a0
--- /dev/null
+++ b/datasource/mongo/sd/listwatch_inner.go
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "go.mongodb.org/mongo-driver/bson"
+       md "go.mongodb.org/mongo-driver/mongo"
+       "go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type mongoListWatch struct {
+       Key         string
+       resumeToken bson.Raw
+}
+
+func (lw *mongoListWatch) List(op sdcommon.ListWatchConfig) 
(*sdcommon.ListWatchResp, error) {
+       otCtx, cancel := context.WithTimeout(op.Context, op.Timeout)
+       defer cancel()
+
+       resp, err := client.GetMongoClient().Find(otCtx, lw.Key, bson.M{})
+       if err != nil {
+               log.Error(fmt.Sprintf("list key %s failed", lw.Key), err)
+               return nil, err
+       }
+
+       // convert mongoListResponse to ListWatchResp
+       lwRsp := &sdcommon.ListWatchResp{}
+       lwRsp.Resources = make([]*sdcommon.Resource, 0)
+
+       for resp.Next(context.Background()) {
+               info := lw.doParseDocumentToResource(resp.Current)
+               lwRsp.Resources = append(lwRsp.Resources, &info)
+       }
+
+       return lwRsp, nil
+}
+
+func (lw *mongoListWatch) EventBus(op sdcommon.ListWatchConfig) 
*sdcommon.EventBus {
+       return sdcommon.NewEventBus(lw, op)
+}
+
+func (lw *mongoListWatch) DoWatch(ctx context.Context, f 
func(*sdcommon.ListWatchResp)) error {
+       csOptions := &options.ChangeStreamOptions{}
+       csOptions.SetFullDocument(options.UpdateLookup)
+
+       resumeToken := lw.resumeToken
+       if resumeToken != nil {
+               csOptions.SetResumeAfter(resumeToken)
+       }
+
+       resp, err := client.GetMongoClient().Watch(ctx, lw.Key, md.Pipeline{}, 
csOptions)
+
+       if err != nil {
+               log.Error(fmt.Sprintf("watch table %s failed", lw.Key), err)
+               f(nil)
+               return err
+       }
+
+       for resp.Next(ctx) {
+               lw.resumeToken = resp.ResumeToken()
+
+               wRsp := &MongoWatchResponse{}
+               err := bson.Unmarshal(resp.Current, &wRsp)
+
+               if err != nil {
+                       log.Error("error to parse bson raw to mongo watch 
response", err)
+                       return err
+               }
+
+               // convert mongoWatchResponse to ListWatchResp
+               resource := lw.doParseWatchRspToResource(wRsp)
+
+               lwRsp := &sdcommon.ListWatchResp{}
+               lwRsp.Resources = append(lwRsp.Resources, &resource)
+               switch wRsp.OperationType {
+               case insertOp:
+                       lwRsp.Action = sdcommon.ActionCreate
+               case updateOp:
+                       lwRsp.Action = sdcommon.ActionUpdate
+               case deleteOp:
+                       lwRsp.Action = sdcommon.ActionDelete
+               default:
+                       log.Warn(fmt.Sprintf("unrecognized action:%s", 
lwRsp.Action))
+               }
+
+               f(lwRsp)
+       }
+
+       return err
+}
+
+func (lw *mongoListWatch) doParseDocumentToResource(fullDocument bson.Raw) 
(resource sdcommon.Resource) {
+       var err error
+
+       documentID := MongoDocument{}
+       err = bson.Unmarshal(fullDocument, &documentID)
+       if err != nil {
+               return
+       }
+
+       resource.DocumentID = documentID.ID.Hex()
+
+       switch lw.Key {
+       case instance:
+               instance := Instance{}
+               err = bson.Unmarshal(fullDocument, &instance)
+               if err != nil {
+                       log.Error("error to parse bson raw to documentInfo", 
err)
+                       return
+               }
+               resource.Key = instance.InstanceInfo.InstanceId
+               resource.Value = instance
+       case service:
+               service := Service{}
+               err := bson.Unmarshal(fullDocument, &service)
+               if err != nil {
+                       log.Error("error to parse bson raw to documentInfo", 
err)
+                       return
+               }
+               resource.Key = service.ServiceInfo.ServiceId
+               resource.Value = service
+       default:
+               return
+       }
+
+       return
+}
+
+func (lw *mongoListWatch) ResumeToken() bson.Raw {
+       return lw.resumeToken
+}
+
+func (lw *mongoListWatch) setResumeToken(resumeToken bson.Raw) {
+       lw.resumeToken = resumeToken
+}
+
+func (lw *mongoListWatch) doParseWatchRspToResource(wRsp *MongoWatchResponse) 
(resource sdcommon.Resource) {
+       switch wRsp.OperationType {
+       case deleteOp:
+               //delete operation has no fullDocumentValue
+               resource.DocumentID = wRsp.DocumentKey.ID.Hex()
+               return
+       case insertOp, updateOp, replaceOp:
+               return lw.doParseDocumentToResource(wRsp.FullDocument)
+       default:
+               log.Warn(fmt.Sprintf("unrecognized operation:%s", 
wRsp.OperationType))
+       }
+       return
+}
diff --git a/datasource/mongo/sd/listwatch_test.go 
b/datasource/mongo/sd/listwatch_test.go
new file mode 100644
index 0000000..ad0a03a
--- /dev/null
+++ b/datasource/mongo/sd/listwatch_test.go
@@ -0,0 +1,87 @@
+package sd
+
+import (
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "go.mongodb.org/mongo-driver/bson"
+       "go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+func TestListWatchConfig_String(t *testing.T) {
+       t.Run("TestListWatchConfig_String", func(t *testing.T) {
+               config := sdcommon.ListWatchConfig{
+                       Timeout: 666,
+               }
+               ret := config.String()
+               assert.Equal(t, "{timeout: 666ns}", ret)
+       })
+       t.Run("when time is nil", func(t *testing.T) {
+               config := sdcommon.ListWatchConfig{}
+               ret := config.String()
+               assert.Equal(t, "{timeout: 0s}", ret)
+       })
+}
+
+func TestDoParseWatchRspToMongoInfo(t *testing.T) {
+       documentID := primitive.NewObjectID()
+
+       mockDocument, _ := bson.Marshal(bson.M{"_id": documentID, "domain": 
"default", "project": "default", "instanceinfo": bson.M{"instanceid": 
"8064a600438511eb8584fa163e8a81c9", "serviceid": 
"91afbe0faa9dc1594689139f099eb293b0cd048d",
+               "hostname": "ecs-hcsadlab-dev-0002", "status": "UP", 
"timestamp": "1608552622", "modtimestamp": "1608552622", "version": "0.0.1"}})
+
+       mockServiceDocument, _ := bson.Marshal(bson.M{"_id": documentID, 
"domain": "default", "project": "default", "serviceinfo": bson.M{"serviceid": 
"91afbe0faa9dc1594689139f099eb293b0cd048d", "timestamp": "1608552622", 
"modtimestamp": "1608552622", "version": "0.0.1"}})
+
+       // case instance insertOp
+
+       mockWatchRsp := &MongoWatchResponse{OperationType: insertOp,
+               FullDocument: mockDocument,
+               DocumentKey:  MongoDocument{ID: documentID},
+       }
+       ilw := mongoListWatch{
+               Key: instance,
+       }
+       info := ilw.doParseWatchRspToResource(mockWatchRsp)
+       assert.Equal(t, documentID.Hex(), info.DocumentID)
+       assert.Equal(t, "8064a600438511eb8584fa163e8a81c9", info.Key)
+
+       // case updateOp
+       mockWatchRsp.OperationType = updateOp
+       info = ilw.doParseWatchRspToResource(mockWatchRsp)
+       assert.Equal(t, documentID.Hex(), info.DocumentID)
+       assert.Equal(t, "8064a600438511eb8584fa163e8a81c9", info.Key)
+       assert.Equal(t, "1608552622", 
info.Value.(Instance).InstanceInfo.ModTimestamp)
+
+       // case delete
+       mockWatchRsp.OperationType = deleteOp
+       info = ilw.doParseWatchRspToResource(mockWatchRsp)
+       assert.Equal(t, documentID.Hex(), info.DocumentID)
+       assert.Equal(t, "", info.Key)
+
+       // case service insertOp
+       mockWatchRsp = &MongoWatchResponse{OperationType: insertOp,
+               FullDocument: mockServiceDocument,
+               DocumentKey:  MongoDocument{ID: primitive.NewObjectID()},
+       }
+       ilw.Key = service
+       info = ilw.doParseWatchRspToResource(mockWatchRsp)
+       assert.Equal(t, documentID.Hex(), info.DocumentID)
+       assert.Equal(t, "91afbe0faa9dc1594689139f099eb293b0cd048d", info.Key)
+}
+
+func TestInnerListWatch_ResumeToken(t *testing.T) {
+       ilw := mongoListWatch{
+               Key:         instance,
+               resumeToken: bson.Raw("resumToken"),
+       }
+       t.Run("get resume token test", func(t *testing.T) {
+               res := ilw.ResumeToken()
+               assert.NotNil(t, res)
+               assert.Equal(t, bson.Raw("resumToken"), res)
+       })
+
+       t.Run("set resume token test", func(t *testing.T) {
+               ilw.setResumeToken(bson.Raw("resumToken2"))
+               assert.Equal(t, ilw.resumeToken, bson.Raw("resumToken2"))
+       })
+}
diff --git a/datasource/mongo/sd/mongo_cache.go 
b/datasource/mongo/sd/mongo_cache.go
new file mode 100644
index 0000000..942a3b8
--- /dev/null
+++ b/datasource/mongo/sd/mongo_cache.go
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+       "sync"
+
+       "github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+// MongoCache implements Cache.
+// MongoCache is dedicated to stores service discovery data,
+// e.g. service, instance, lease.
+type MongoCache struct {
+       Options       *Options
+       name          string
+       store         map[string]interface{}
+       documentStore map[string]string
+       rwMux         sync.RWMutex
+       dirty         bool
+}
+
+func (c *MongoCache) Name() string {
+       return c.name
+}
+
+func (c *MongoCache) Size() (l int) {
+       c.rwMux.RLock()
+       l = int(util.Sizeof(c.store))
+       c.rwMux.RUnlock()
+       return
+}
+
+func (c *MongoCache) Get(id string) (v interface{}) {
+       c.rwMux.RLock()
+       if p, ok := c.store[id]; ok {
+               v = p
+       }
+       c.rwMux.RUnlock()
+       return
+}
+
+func (c *MongoCache) GetKeyByDocumentID(documentKey string) (id string) {
+       c.rwMux.RLock()
+       id = c.documentStore[documentKey]
+       c.rwMux.RUnlock()
+       return
+}
+
+func (c *MongoCache) GetDocumentIDByID(id string) (documentID string) {
+       c.rwMux.RLock()
+       for k, v := range c.documentStore {
+               if v == id {
+                       documentID = k
+                       break
+               }
+       }
+       c.rwMux.RUnlock()
+       return
+}
+
+func (c *MongoCache) Put(id string, v interface{}) {
+       c.rwMux.Lock()
+       c.store[id] = v
+       c.rwMux.Unlock()
+}
+
+func (c *MongoCache) PutDocumentID(id string, documentID string) {
+       c.rwMux.Lock()
+       c.documentStore[documentID] = id
+       c.rwMux.Unlock()
+}
+
+func (c *MongoCache) Remove(id string) {
+       c.rwMux.Lock()
+       delete(c.store, id)
+       c.rwMux.Unlock()
+}
+
+func (c *MongoCache) RemoveDocumentID(documentID string) {
+       c.rwMux.Lock()
+
+       delete(c.documentStore, documentID)
+
+       c.rwMux.Unlock()
+}
+
+func (c *MongoCache) MarkDirty() {
+       c.dirty = true
+}
+
+func (c *MongoCache) Dirty() bool { return c.dirty }
+
+func (c *MongoCache) Clear() {
+       c.rwMux.Lock()
+       c.dirty = false
+       c.store = make(map[string]interface{})
+       c.rwMux.Unlock()
+}
+
+func (c *MongoCache) ForEach(iter func(k string, v interface{}) (next bool)) {
+       c.rwMux.RLock()
+loopParent:
+       for k, v := range c.store {
+               if v == nil {
+                       continue loopParent
+               }
+               if !iter(k, v) {
+                       break loopParent
+               }
+       }
+       c.rwMux.RUnlock()
+}
+
+func NewMongoCache(name string, options *Options) *MongoCache {
+       return &MongoCache{
+               Options:       options,
+               name:          name,
+               store:         make(map[string]interface{}),
+               documentStore: make(map[string]string),
+       }
+}
diff --git a/datasource/mongo/sd/mongo_cacher.go 
b/datasource/mongo/sd/mongo_cacher.go
new file mode 100644
index 0000000..57d28a6
--- /dev/null
+++ b/datasource/mongo/sd/mongo_cacher.go
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+       "sync"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/pkg/backoff"
+       "github.com/apache/servicecomb-service-center/pkg/gopool"
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       rmodel "github.com/go-chassis/cari/discovery"
+)
+
+// MongoCacher manages mongo cache.
+// To updateOp cache, MongoCacher watch mongo event and pull data periodly 
from mongo.
+// When the cache data changes, MongoCacher creates events and notifies it's
+// subscribers.
+// Use Options to set it's behaviors.
+type MongoCacher struct {
+       Options     *Options
+       reListCount int
+       isFirstTime bool
+       cache       *MongoCache
+       ready       chan struct{}
+       lw          sdcommon.ListWatch
+       mux         sync.Mutex
+       once        sync.Once
+       goroutine   *gopool.Pool
+}
+
+func (c *MongoCacher) Cache() *MongoCache {
+       return c.cache
+}
+
+func (c *MongoCacher) Run() {
+       c.once.Do(func() {
+               c.goroutine.Do(c.refresh)
+       })
+}
+
+func (c *MongoCacher) Stop() {
+       c.goroutine.Close(true)
+
+       util.SafeCloseChan(c.ready)
+}
+
+func (c *MongoCacher) Ready() <-chan struct{} {
+       return c.ready
+}
+
+func (c *MongoCacher) IsReady() bool {
+       select {
+       case <-c.ready:
+               return true
+       default:
+               return false
+       }
+}
+
+func (c *MongoCacher) needList() bool {
+       if c.isFirstTime {
+               c.isFirstTime = false
+               return true
+       }
+       c.reListCount++
+       if c.reListCount < sdcommon.DefaultForceListInterval {
+               return false
+       }
+       c.reListCount = 0
+       return true
+}
+
+func (c *MongoCacher) doList(cfg sdcommon.ListWatchConfig) error {
+       resp, err := c.lw.List(cfg)
+       if err != nil {
+               return err
+       }
+
+       resources := resp.Resources
+
+       defer log.Debug(fmt.Sprintf("finish to cache key %s, %d items",
+               c.Options.Key, len(resources)))
+
+       //just reset the cacher if cache marked dirty
+       if c.cache.Dirty() {
+               c.reset(resources)
+               log.Warn(fmt.Sprintf("Cache[%s] is reset!", c.cache.Name()))
+               return nil
+       }
+
+       // calc and return the diff between cache and mongodb
+       events := c.filter(resources)
+
+       //notify the subscribers
+       c.sync(events)
+       return nil
+}
+
+func (c *MongoCacher) reset(infos []*sdcommon.Resource) {
+       // clear cache before Set is safe, because the watch operation is stop,
+       // but here will make all API requests go to MONGO directly.
+       c.cache.Clear()
+       // do not notify when cacher is dirty status,
+       // otherwise, too many events will notify to downstream.
+       c.buildCache(c.filter(infos))
+}
+
+func (c *MongoCacher) doWatch(cfg sdcommon.ListWatchConfig) error {
+       if eventbus := c.lw.EventBus(cfg); eventbus != nil {
+               return c.handleEventBus(eventbus)
+       }
+       return fmt.Errorf("handle a nil watcher")
+}
+
+func (c *MongoCacher) ListAndWatch(ctx context.Context) error {
+       c.mux.Lock()
+       defer c.mux.Unlock()
+       defer log.Recover() // ensure ListAndWatch never raise panic
+
+       cfg := sdcommon.ListWatchConfig{
+               Timeout: c.Options.Timeout,
+               Context: ctx,
+       }
+
+       // first time should initial cache, set watch timeout less
+       if c.isFirstTime {
+               cfg.Timeout = FirstTimeout
+       }
+
+       err := c.doWatch(cfg)
+       if err != nil {
+               log.Error("doWatch err", err)
+       }
+
+       // the scenario need to list mongo:
+       // 1. Initial: cache is building, the lister's is first time to run.
+       // 2. Runtime: error occurs in previous watch operation, the lister's 
status is set to error.
+       // 3. Runtime: watch operation timed out over 
DEFAULT_FORCE_LIST_INTERVAL times.
+       if c.needList() {
+               // recover timeout for list
+               if c.isFirstTime {
+                       cfg.Timeout = c.Options.Timeout
+               }
+
+               if err := c.doList(cfg); err != nil && (!c.IsReady()) {
+                       log.Error("doList error", err)
+                       return err // do retry to list mongo
+               }
+               // keep going to next step:
+               // 1. doList return OK.
+               // 2. some traps in mongo client
+       }
+
+       util.SafeCloseChan(c.ready)
+
+       return nil
+}
+
+func (c *MongoCacher) handleEventBus(eventbus *sdcommon.EventBus) error {
+       defer eventbus.Stop()
+
+       if eventbus.Bus == nil {
+               return nil
+       }
+
+       for resp := range eventbus.ResourceEventBus() {
+               events := make([]MongoEvent, 0)
+
+               if resp == nil {
+                       return errors.New("handle watcher error")
+               }
+
+               for _, resource := range resp.Resources {
+                       action := resp.Action
+                       var event MongoEvent
+                       switch action {
+                       case sdcommon.ActionCreate:
+                               event = NewMongoEventByResource(resource, 
rmodel.EVT_CREATE)
+                       case sdcommon.ActionUpdate:
+                               event = NewMongoEventByResource(resource, 
rmodel.EVT_UPDATE)
+                       case sdcommon.ActionDelete:
+                               resource.Key = 
c.cache.GetKeyByDocumentID(resource.DocumentID)
+                               resource.Value = c.cache.Get(resource.Key)
+                               event = NewMongoEventByResource(resource, 
rmodel.EVT_DELETE)
+                       }
+                       events = append(events, event)
+               }
+
+               c.sync(events)
+               log.Debug(fmt.Sprintf("finish to handle %d events, table: %s", 
len(events), c.Options.Key))
+       }
+
+       return nil
+}
+
+func (c *MongoCacher) refresh(ctx context.Context) {
+       log.Debug(fmt.Sprintf("start to list and watch %s", c.Options))
+       retries := 0
+
+       timer := time.NewTimer(sdcommon.MinWaitInterval)
+       defer timer.Stop()
+       for {
+               nextPeriod := sdcommon.MinWaitInterval
+               if err := c.ListAndWatch(ctx); err != nil {
+                       retries++
+                       nextPeriod = backoff.GetBackoff().Delay(retries)
+               } else {
+                       retries = 0
+               }
+
+               select {
+               case <-ctx.Done():
+                       log.Debug(fmt.Sprintf("stop to list and watch %s", 
c.Options))
+                       return
+               case <-timer.C:
+                       timer.Reset(nextPeriod)
+               }
+       }
+}
+
+// keep the evts valID when call sync
+func (c *MongoCacher) sync(evts []MongoEvent) {
+       if len(evts) == 0 {
+               return
+       }
+
+       c.onEvents(evts)
+}
+
+func (c *MongoCacher) filter(infos []*sdcommon.Resource) []MongoEvent {
+       nc := len(infos)
+       newStore := make(map[string]interface{}, nc)
+       documentIDRecord := make(map[string]string, nc)
+
+       for _, info := range infos {
+               event := NewMongoEventByResource(info, rmodel.EVT_CREATE)
+               newStore[event.ResourceID] = info.Value
+               documentIDRecord[event.ResourceID] = info.DocumentID
+       }
+
+       filterStopCh := make(chan struct{})
+       eventsCh := make(chan [sdcommon.EventBlockSize]MongoEvent, 2)
+
+       go c.filterDelete(newStore, eventsCh, filterStopCh)
+
+       go c.filterCreateOrUpdate(newStore, documentIDRecord, eventsCh, 
filterStopCh)
+
+       events := make([]MongoEvent, 0, nc)
+       for block := range eventsCh {
+               for _, e := range block {
+                       if e.Value == nil {
+                               break
+                       }
+                       events = append(events, e)
+               }
+       }
+       return events
+}
+
+func (c *MongoCacher) filterDelete(newStore map[string]interface{},
+       eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan 
struct{}) {
+       var block [sdcommon.EventBlockSize]MongoEvent
+       i := 0
+
+       c.cache.ForEach(func(k string, v interface{}) (next bool) {
+               next = true
+
+               _, ok := newStore[k]
+               if ok {
+                       // k in store, also in new store, is not deleted, return
+                       return
+               }
+
+               // k in store but not in new store, it means k is deleted
+               if i >= sdcommon.EventBlockSize {
+                       eventsCh <- block
+                       block = [sdcommon.EventBlockSize]MongoEvent{}
+                       i = 0
+               }
+
+               documentID := c.cache.GetDocumentIDByID(k)
+               block[i] = NewMongoEvent(k, documentID, rmodel.EVT_DELETE, v)
+               i++
+               return
+       })
+
+       if i > 0 {
+               eventsCh <- block
+       }
+
+       close(filterStopCh)
+}
+
+func (c *MongoCacher) filterCreateOrUpdate(newStore map[string]interface{}, 
newDocumentStore map[string]string,
+       eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan 
struct{}) {
+       var block [sdcommon.EventBlockSize]MongoEvent
+       i := 0
+
+       for k, v := range newStore {
+               ov := c.cache.Get(k)
+               if ov == nil {
+                       if i >= sdcommon.EventBlockSize {
+                               eventsCh <- block
+                               block = [sdcommon.EventBlockSize]MongoEvent{}
+                               i = 0
+                       }
+
+                       block[i] = NewMongoEvent(k, newDocumentStore[k], 
rmodel.EVT_CREATE, v)
+                       i++
+
+                       continue
+               }
+
+               if c.isValueNotUpdated(v, ov) {
+                       continue
+               }
+
+               log.Debug(fmt.Sprintf("value is updateOp of key:%s, old value 
is:%s, new value is:%s", k, ov, v))
+
+               if i >= sdcommon.EventBlockSize {
+                       eventsCh <- block
+                       block = [sdcommon.EventBlockSize]MongoEvent{}
+                       i = 0
+               }
+
+               block[i] = NewMongoEvent(k, newDocumentStore[k], 
rmodel.EVT_UPDATE, v)
+               i++
+       }
+
+       if i > 0 {
+               eventsCh <- block
+       }
+
+       <-filterStopCh
+
+       close(eventsCh)
+}
+
+func (c *MongoCacher) isValueNotUpdated(value interface{}, newValue 
interface{}) bool {
+       var modTime string
+       var newModTime string
+
+       switch c.Options.Key {
+       case instance:
+               instance := value.(Instance)
+               newInstance := newValue.(Instance)
+               if instance.InstanceInfo == nil || newInstance.InstanceInfo == 
nil {
+                       return true
+               }
+               modTime = instance.InstanceInfo.ModTimestamp
+               newModTime = newInstance.InstanceInfo.ModTimestamp
+       case service:
+               service := value.(Service)
+               newService := newValue.(Service)
+               if service.ServiceInfo == nil || newService.ServiceInfo == nil {
+                       return true
+               }
+               modTime = service.ServiceInfo.ModTimestamp
+               newModTime = newService.ServiceInfo.ModTimestamp
+       }
+
+       if newModTime == "" || modTime == newModTime {
+               return true
+       }
+
+       return false
+}
+
+func (c *MongoCacher) onEvents(events []MongoEvent) {
+       c.buildCache(events)
+
+       c.notify(events)
+}
+
+func (c *MongoCacher) buildCache(events []MongoEvent) {
+       for i, evt := range events {
+               key := evt.ResourceID
+               value := c.cache.Get(key)
+               ok := value != nil
+
+               switch evt.Type {
+               case rmodel.EVT_CREATE, rmodel.EVT_UPDATE:
+                       switch {
+                       case !c.IsReady():
+                               evt.Type = rmodel.EVT_INIT
+                       case !ok && evt.Type != rmodel.EVT_CREATE:
+                               log.Warn(fmt.Sprintf("unexpected %s event! it 
should be %s key %s",
+                                       evt.Type, rmodel.EVT_CREATE, key))
+                               evt.Type = rmodel.EVT_CREATE
+                       case ok && evt.Type != rmodel.EVT_UPDATE:
+                               log.Warn(fmt.Sprintf("unexpected %s event! it 
should be %s key %s",
+                                       evt.Type, rmodel.EVT_UPDATE, key))
+                               evt.Type = rmodel.EVT_UPDATE
+                       }
+
+                       c.cache.Put(key, evt.Value)
+                       c.cache.PutDocumentID(key, evt.DocumentID)
+
+                       events[i] = evt
+               case rmodel.EVT_DELETE:
+                       if !ok {
+                               log.Warn(fmt.Sprintf("unexpected %s event! key 
%s does not cache",
+                                       evt.Type, key))
+                       } else {
+                               evt.Value = value
+
+                               c.cache.Remove(key)
+                               c.cache.RemoveDocumentID(evt.DocumentID)
+                       }
+                       events[i] = evt
+               }
+       }
+}
+
+func (c *MongoCacher) notify(evts []MongoEvent) {
+       eventProxy := EventProxy(c.Options.Key)
+
+       if eventProxy == nil {
+               return
+       }
+
+       defer log.Recover()
+
+       for _, evt := range evts {
+               eventProxy.OnEvent(evt)
+       }
+}
+
+func NewMongoCacher(options *Options, cache *MongoCache) *MongoCacher {
+       return &MongoCacher{
+               Options:     options,
+               isFirstTime: true,
+               cache:       cache,
+               ready:       make(chan struct{}),
+               lw: &mongoListWatch{
+                       Key: options.Key,
+               },
+               goroutine: gopool.New(context.Background()),
+       }
+}
diff --git a/datasource/mongo/sd/mongo_cacher_test.go 
b/datasource/mongo/sd/mongo_cacher_test.go
new file mode 100644
index 0000000..86f571f
--- /dev/null
+++ b/datasource/mongo/sd/mongo_cacher_test.go
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+       "testing"
+
+       "github.com/apache/servicecomb-service-center/pkg/gopool"
+       pb "github.com/go-chassis/cari/discovery"
+       "github.com/stretchr/testify/assert"
+       "go.mongodb.org/mongo-driver/bson"
+)
+
+type MockListWatch struct {
+       ListResponse  *sdcommon.ListWatchResp
+       WatchResponse *sdcommon.ListWatchResp
+       resumeToken   bson.Raw
+}
+
+func (lw *MockListWatch) List(sdcommon.ListWatchConfig) 
(*sdcommon.ListWatchResp, error) {
+       if lw.ListResponse == nil {
+               return nil, fmt.Errorf("list error")
+       }
+       return lw.ListResponse, nil
+}
+
+func (lw *MockListWatch) DoWatch(ctx context.Context, f 
func(*sdcommon.ListWatchResp)) error {
+       if lw.WatchResponse == nil {
+               return fmt.Errorf("error")
+       }
+
+       f(lw.WatchResponse)
+       <-ctx.Done()
+       return nil
+}
+
+func (lw *MockListWatch) EventBus(op sdcommon.ListWatchConfig) 
*sdcommon.EventBus {
+       return sdcommon.NewEventBus(lw, op)
+}
+
+func TestNewMongoCacher(t *testing.T) {
+       mockMongoCache := NewMongoCache("test", DefaultOptions())
+       lw := &MockListWatch{}
+
+       cr := &MongoCacher{
+               Options:     DefaultOptions(),
+               ready:       make(chan struct{}),
+               isFirstTime: true,
+               lw:          lw,
+               goroutine:   gopool.New(context.Background()),
+               cache:       mockMongoCache,
+       }
+
+       ctx, cancel := context.WithCancel(context.Background())
+       cancel()
+
+       // case: cause list internal error before initialized
+       t.Run("case list: internal error before initialized", func(t 
*testing.T) {
+               cr.refresh(ctx)
+               if cr.IsReady() {
+                       t.Fatalf("TestNewKvCacher failed")
+               }
+
+       })
+
+       // prepare mock data
+       var evt MongoEvent
+       cr = &MongoCacher{
+               Options:   DefaultOptions().SetTable(instance),
+               ready:     make(chan struct{}),
+               lw:        lw,
+               goroutine: gopool.New(context.Background()),
+               cache:     mockMongoCache,
+       }
+
+       EventProxy(instance).AddHandleFunc(func(e MongoEvent) {
+               evt = e
+       })
+
+       mockDocumentID := "5fcf2f1a4ea1e6d2f4c61d47"
+       mockResourceID := "95cd8dbf3e8411eb92d8fa163e8a81c9"
+       mockResumeToken, _ :=
+               bson.Marshal(bson.M{"_data": 
"825FDB4272000000012B022C0100296E5A10043E2D15AC82D9484C8090E68AF36FED2A46645F696400645FD76265066A6D2DF2AAC8D80004"})
+
+       var resources []*sdcommon.Resource
+       resource := &sdcommon.Resource{Key: mockResourceID, DocumentID: 
mockDocumentID, Value: Instance{Domain: "default", Project: "default",
+               InstanceInfo: &pb.MicroServiceInstance{InstanceId: 
mockResourceID, ModTimestamp: "100000"}}}
+       resources = append(resources, resource)
+       test := &sdcommon.ListWatchResp{
+               Action:    sdcommon.ActionCreate,
+               Resources: resources,
+       }
+
+       evtExpect := MongoEvent{
+               DocumentID: mockDocumentID,
+               ResourceID: mockResourceID,
+               Value:      resource.Value,
+               Type:       pb.EVT_INIT,
+       }
+
+       // case: list 1 resp and watch 0 event
+       t.Run("case list: first time list init cache", func(t *testing.T) {
+               // case: resume token is nil, first time list event is init
+               cr.isFirstTime = true
+               cr.cache.Remove(mockResourceID)
+               cr.cache.RemoveDocumentID(mockDocumentID)
+
+               lw.ListResponse = test
+               lw.resumeToken = nil
+               lw.WatchResponse = nil
+
+               cr.refresh(ctx)
+
+               // check ready
+               assert.Equal(t, true, cr.IsReady())
+
+               //check config
+               assert.Equal(t, instance, cr.Options.Key)
+
+               // check event
+               assert.Equal(t, evtExpect, evt)
+
+               // check cache
+               cache := cr.cache.Get(mockResourceID)
+               assert.Equal(t, resource.Value, cache)
+       })
+
+       t.Run("case list: re-list and updateOp cache", func(t *testing.T) {
+               // case: re-list and should be no event
+               lw.WatchResponse = nil
+               evt.Value = nil
+               cr.refresh(ctx)
+
+               //check events
+               assert.Nil(t, evt.Value)
+
+               // check cache
+               cache := cr.cache.Get(mockResourceID)
+               assert.Equal(t, resource.Value, cache)
+
+               // prepare updateOp data
+               dataUpdate := &sdcommon.Resource{Key: mockResourceID, 
DocumentID: mockDocumentID,
+                       Value: Instance{Domain: "default", Project: "default",
+                               InstanceInfo: 
&pb.MicroServiceInstance{InstanceId: mockResourceID, HostName: "test", 
ModTimestamp: "100001"}}}
+
+               var mongoUpdateResources []*sdcommon.Resource
+               mongoUpdateResources = append(mongoUpdateResources, dataUpdate)
+               testUpdate := &sdcommon.ListWatchResp{
+                       Action:    sdcommon.ActionUpdate,
+                       Resources: mongoUpdateResources,
+               }
+
+               lw.ListResponse = testUpdate
+               lw.resumeToken = mockResumeToken
+
+               // case: re-list and over no event times, and then event should 
be updateOp
+               for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+                       lw.WatchResponse = nil
+               }
+
+               evt.Value = nil
+               for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+                       cr.refresh(ctx)
+               }
+               // check event
+               evtExpect.Type = pb.EVT_UPDATE
+               evtExpect.Value = dataUpdate.Value
+               assert.Equal(t, evtExpect, evt)
+
+               // check cache
+               cache = cr.cache.Get(mockResourceID)
+               assert.Equal(t, dataUpdate.Value, cache)
+       })
+
+       t.Run("case list: no infos list and delete cache", func(t *testing.T) {
+               // case: no infos list, event should be delete
+               lw.ListResponse = &sdcommon.ListWatchResp{}
+               for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+                       lw.WatchResponse = nil
+               }
+               evt.Value = nil
+               for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
+                       cr.refresh(ctx)
+               }
+
+               // check event
+               evtExpect.Type = pb.EVT_DELETE
+               assert.Equal(t, evtExpect, evt)
+
+               // check cache
+               cache := cr.cache.Get(mockResourceID)
+               assert.Nil(t, nil, cache)
+       })
+
+       t.Run("case list: mark cache dirty and reset cache", func(t *testing.T) 
{
+               lw.ListResponse = test
+               cr.isFirstTime = true
+               evt.Value = nil
+
+               cr.cache.MarkDirty()
+               cr.refresh(ctx)
+
+               // check event
+               if evt.Value != nil {
+                       t.Fatalf("TestNewMongoCacher failed, %v", evt)
+               }
+
+               // check cache
+               cache := cr.cache.Get(mockResourceID)
+               assert.Equal(t, resource.Value, cache)
+       })
+
+       t.Run("case watch: caught create event", func(t *testing.T) {
+               cr.cache.Remove(mockResourceID)
+               cr.cache.RemoveDocumentID(mockDocumentID)
+               lw.WatchResponse = test
+               lw.ListResponse = nil
+               lw.resumeToken = mockResumeToken
+
+               cr.refresh(ctx)
+
+               // check event
+               evtExpect.Type = pb.EVT_CREATE
+               evtExpect.Value = resource.Value
+               assert.Equal(t, evtExpect, evt)
+
+               // check cache
+               cache := cr.cache.Get(mockResourceID)
+               assert.Equal(t, resource.Value, cache)
+
+       })
+
+       t.Run("case watch: caught updateOp event", func(t *testing.T) {
+               // prepare updateOp data
+               dataUpdate := &sdcommon.Resource{Key: mockResourceID, 
DocumentID: mockDocumentID,
+                       Value: Instance{Domain: "default", Project: "default",
+                               InstanceInfo: 
&pb.MicroServiceInstance{InstanceId: mockResourceID, HostName: "test", 
ModTimestamp: "100001"}}}
+
+               var mongoUpdateResources []*sdcommon.Resource
+               mongoUpdateResources = append(mongoUpdateResources, dataUpdate)
+               testUpdate := &sdcommon.ListWatchResp{
+                       Action:    sdcommon.ActionUpdate,
+                       Resources: mongoUpdateResources,
+               }
+               lw.WatchResponse = testUpdate
+               lw.ListResponse = nil
+
+               cr.refresh(ctx)
+
+               // check event
+               evtExpect.Type = pb.EVT_UPDATE
+               evtExpect.Value = dataUpdate.Value
+               assert.Equal(t, evtExpect, evt)
+
+               // check cache
+               cache := cr.cache.Get(mockResourceID)
+               assert.Equal(t, dataUpdate.Value, cache)
+       })
+
+       t.Run("case watch: caught delete event but value is nil", func(t 
*testing.T) {
+               test.Action = sdcommon.ActionDelete
+               lw.WatchResponse = test
+               lw.ListResponse = nil
+
+               cr.refresh(ctx)
+
+               // check event
+               evtExpect.Type = pb.EVT_DELETE
+               assert.Equal(t, evtExpect, evt)
+
+               // check cache
+               cache := cr.cache.Get(mockResourceID)
+               assert.Nil(t, cache)
+
+       })
+}
+
+func TestMongoCacher_Run(t *testing.T) {
+       lw := &MockListWatch{}
+
+       mockMongoCache := NewMongoCache("test", DefaultOptions())
+       cr := &MongoCacher{
+               Options:   DefaultOptions().SetTable(instance),
+               ready:     make(chan struct{}),
+               lw:        lw,
+               goroutine: gopool.New(context.Background()),
+               cache:     mockMongoCache,
+       }
+
+       cr.Run()
+
+       // check cache
+       cache := cr.cache
+       assert.NotNil(t, cache)
+
+       cr.Stop()
+}
diff --git a/datasource/etcd/sd/etcd/watcher.go b/datasource/mongo/sd/options.go
similarity index 53%
rename from datasource/etcd/sd/etcd/watcher.go
rename to datasource/mongo/sd/options.go
index 37af706..f061003 100644
--- a/datasource/etcd/sd/etcd/watcher.go
+++ b/datasource/mongo/sd/options.go
@@ -15,13 +15,42 @@
  * limitations under the License.
  */
 
-package etcd
+package sd
 
 import (
-       "github.com/apache/servicecomb-service-center/datasource/etcd/client"
+       "fmt"
+       "time"
 )
 
-type Watcher interface {
-       EventBus() <-chan *client.PluginResponse
-       Stop()
+const (
+       FirstTimeout         = 2 * time.Second
+       DefaultTimeout       = 30 * time.Second
+       DefaultCacheInitSize = 100
+)
+
+type Options struct {
+       // Key is the table to unique specify resource type
+       Key      string
+       InitSize int
+       Timeout  time.Duration
+       Period   time.Duration
+}
+
+func (options *Options) String() string {
+       return fmt.Sprintf("{key: %s, timeout: %s, period: %s}",
+               options.Key, options.Timeout, options.Period)
+}
+
+func (options *Options) SetTable(key string) *Options {
+       options.Key = key
+       return options
+}
+
+func DefaultOptions() *Options {
+       return &Options{
+               Key:      "",
+               Timeout:  DefaultTimeout,
+               Period:   time.Second,
+               InitSize: DefaultCacheInitSize,
+       }
 }
diff --git a/datasource/mongo/sd/options_test.go 
b/datasource/mongo/sd/options_test.go
new file mode 100644
index 0000000..30c78c2
--- /dev/null
+++ b/datasource/mongo/sd/options_test.go
@@ -0,0 +1,42 @@
+package sd
+
+import (
+       "testing"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/go-chassis/cari/discovery"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestOptions(t *testing.T) {
+       options := Options{
+               Key: "",
+       }
+       assert.Empty(t, options, "config is empty")
+
+       options1 := options.SetTable("configKey")
+       assert.Equal(t, "configKey", options1.Key,
+               "contain key after method WithTable")
+
+       assert.Equal(t, 0, options1.InitSize,
+               "init size is zero")
+
+       mongoEventFunc = mongoEventFuncGet()
+
+       out := options1.String()
+       assert.NotNil(t, out,
+               "method String return not after methods")
+}
+
+var mongoEventFunc MongoEventFunc
+
+func mongoEventFuncGet() MongoEventFunc {
+       fun := func(evt MongoEvent) {
+               evt.DocumentID = "DocumentID has changed"
+               evt.ResourceID = "BusinessID has changed"
+               evt.Value = 2
+               evt.Type = discovery.EVT_UPDATE
+               log.Info("in event func")
+       }
+       return fun
+}
diff --git a/datasource/mongo/sd/types.go b/datasource/mongo/sd/types.go
new file mode 100644
index 0000000..151bf17
--- /dev/null
+++ b/datasource/mongo/sd/types.go
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sd
+
+import (
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+       "time"
+
+       "github.com/go-chassis/cari/discovery"
+       pb "github.com/go-chassis/cari/discovery"
+       "go.mongodb.org/mongo-driver/bson"
+       "go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+const (
+       service  = "service"
+       instance = "instance"
+)
+
+var (
+       Types []string
+)
+
+func RegisterType(name string) {
+       Types = append(Types, name)
+}
+
+type MongoEvent struct {
+       DocumentID string
+       ResourceID string
+       Value      interface{}
+       Type       discovery.EventType
+}
+
+type MongoEventFunc func(evt MongoEvent)
+
+type MongoEventHandler interface {
+       Type() string
+       OnEvent(evt MongoEvent)
+}
+
+func NewMongoEventByResource(resource *sdcommon.Resource, action 
discovery.EventType) MongoEvent {
+       return MongoEvent{ResourceID: resource.Key, DocumentID: 
resource.DocumentID, Value: resource.Value, Type: action}
+}
+
+func NewMongoEvent(id string, documentID string, action discovery.EventType, v 
interface{}) MongoEvent {
+       event := MongoEvent{}
+       event.ResourceID = id
+       event.DocumentID = documentID
+       event.Type = action
+       event.Value = v
+       return event
+}
+
+type MongoWatchResponse struct {
+       OperationType string
+       FullDocument  bson.Raw
+       DocumentKey   MongoDocument
+}
+
+type MongoDocument struct {
+       ID primitive.ObjectID `bson:"_id"`
+}
+
+type ResumeToken struct {
+       Data []byte `bson:"_data"`
+}
+
+type MongoInfo struct {
+       DocumentID string
+       ResourceID string
+       Value      interface{}
+}
+
+type Service struct {
+       Domain      string
+       Project     string
+       Tags        map[string]string
+       ServiceInfo *pb.MicroService
+}
+
+type Instance struct {
+       Domain       string
+       Project      string
+       RefreshTime  time.Time
+       InstanceInfo *pb.MicroServiceInstance
+}
diff --git a/datasource/mongo/sd/typestore.go b/datasource/mongo/sd/typestore.go
new file mode 100644
index 0000000..806e6b2
--- /dev/null
+++ b/datasource/mongo/sd/typestore.go
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// kv package provides a TypeStore to manage the implementations of sd 
package, see types.go
+package sd
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/pkg/gopool"
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/config"
+)
+
+var store = &TypeStore{}
+
+func init() {
+       store.Initialize()
+       registerInnerTypes()
+}
+
+type TypeStore struct {
+       caches    util.ConcurrentMap
+       ready     chan struct{}
+       goroutine *gopool.Pool
+       isClose   bool
+}
+
+func (s *TypeStore) Initialize() {
+       s.ready = make(chan struct{})
+       s.goroutine = gopool.New(context.Background())
+}
+
+func registerInnerTypes() {
+       RegisterType(service)
+       RegisterType(instance)
+}
+
+func (s *TypeStore) Run() {
+       s.goroutine.Do(s.store)
+       s.goroutine.Do(s.autoClearCache)
+}
+
+func (s *TypeStore) store(ctx context.Context) {
+       // new all types
+       for _, t := range Types {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-s.getOrCreateCache(t).Ready():
+               }
+       }
+       util.SafeCloseChan(s.ready)
+       log.Debug("all caches are ready")
+}
+
+func (s *TypeStore) autoClearCache(ctx context.Context) {
+       ttl := config.GetRegistry().CacheTTL
+
+       if ttl == 0 {
+               return
+       }
+
+       log.Info(fmt.Sprintf("start auto clear cache in %v", ttl))
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-time.After(ttl):
+                       for _, t := range Types {
+                               cache := s.getOrCreateCache(t).Cache()
+                               cache.MarkDirty()
+                       }
+                       log.Warn("caches are marked dirty!")
+               }
+       }
+}
+
+func (s *TypeStore) getOrCreateCache(t string) *MongoCacher {
+       cache, ok := s.caches.Get(t)
+       if ok {
+               return cache.(*MongoCacher)
+       }
+
+       options := DefaultOptions().SetTable(t)
+
+       cache = NewMongoCache(t, options)
+       cacher := NewMongoCacher(options, cache.(*MongoCache))
+       cacher.Run()
+
+       s.caches.Put(t, cacher)
+       return cacher
+}
+
+func (s *TypeStore) Stop() {
+       if s.isClose {
+               return
+       }
+       s.isClose = true
+
+       s.goroutine.Close(true)
+
+       util.SafeCloseChan(s.ready)
+
+       log.Debug("store daemon stopped")
+}
+
+func (s *TypeStore) Ready() <-chan struct{} {
+       return s.ready
+}
+
+func (s *TypeStore) TypeCacher(id string) *MongoCacher { return 
s.getOrCreateCache(id) }
+func (s *TypeStore) Service() *MongoCacher             { return 
s.TypeCacher(service) }
+func (s *TypeStore) Instance() *MongoCacher            { return 
s.TypeCacher(instance) }
+
+func Store() *TypeStore {
+       return store
+}
diff --git a/datasource/mongo/sd/typestore_test.go 
b/datasource/mongo/sd/typestore_test.go
new file mode 100644
index 0000000..11ed78f
--- /dev/null
+++ b/datasource/mongo/sd/typestore_test.go
@@ -0,0 +1,42 @@
+package sd
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestTypeStore_Initialize(t *testing.T) {
+       s := TypeStore{}
+       s.Initialize()
+       assert.NotNil(t, s.ready)
+       assert.NotNil(t, s.goroutine)
+       assert.Equal(t, false, s.isClose)
+}
+
+func TestTypeStore_Ready(t *testing.T) {
+       s := TypeStore{}
+       s.Initialize()
+       c := s.Ready()
+       assert.NotNil(t, c)
+}
+
+func TestTypeStore_Stop(t *testing.T) {
+       t.Run("when closed", func(t *testing.T) {
+               s := TypeStore{
+                       isClose: true,
+               }
+               s.Initialize()
+               s.Stop()
+               assert.Equal(t, true, s.isClose)
+       })
+
+       t.Run("when not closed", func(t *testing.T) {
+               s := TypeStore{
+                       isClose: false,
+               }
+               s.Initialize()
+               s.Stop()
+               assert.Equal(t, true, s.isClose)
+       })
+}
diff --git a/datasource/etcd/sd/etcd/common.go b/datasource/sdcommon/common.go
similarity index 53%
copy from datasource/etcd/sd/etcd/common.go
copy to datasource/sdcommon/common.go
index 1d9d64e..0d58674 100644
--- a/datasource/etcd/sd/etcd/common.go
+++ b/datasource/sdcommon/common.go
@@ -13,42 +13,16 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package etcd
+package sdcommon
 
-import (
-       "time"
-
-       "github.com/apache/servicecomb-service-center/datasource/etcd"
-       "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
-       "github.com/apache/servicecomb-service-center/datasource/etcd/value"
-       "github.com/coreos/etcd/mvcc/mvccpb"
-)
+import "time"
 
 const (
        // force re-list
        DefaultForceListInterval = 4
        DefaultMetricsInterval   = 30 * time.Second
 
-       minWaitInterval = 1 * time.Second
-       eventBlockSize  = 1000
-       eventBusSize    = 1000
+       MinWaitInterval = 1 * time.Second
+       EventBlockSize  = 1000
+       EventBusSize    = 1000
 )
-
-var closedCh = make(chan struct{})
-
-func init() {
-       close(closedCh)
-}
-
-func FromEtcdKeyValue(dist *sd.KeyValue, src *mvccpb.KeyValue, parser 
value.Parser) (err error) {
-       dist.ClusterName = etcd.Configuration().ClusterName
-       dist.Key = src.Key
-       dist.Version = src.Version
-       dist.CreateRevision = src.CreateRevision
-       dist.ModRevision = src.ModRevision
-       if parser == nil {
-               return
-       }
-       dist.Value, err = parser.Unmarshal(src.Value)
-       return
-}
diff --git a/datasource/etcd/sd/etcd/watcher_inner.go 
b/datasource/sdcommon/eventbus.go
similarity index 70%
rename from datasource/etcd/sd/etcd/watcher_inner.go
rename to datasource/sdcommon/eventbus.go
index 2c964aa..b0845e7 100644
--- a/datasource/etcd/sd/etcd/watcher_inner.go
+++ b/datasource/sdcommon/eventbus.go
@@ -15,36 +15,35 @@
  * limitations under the License.
  */
 
-package etcd
+package sdcommon
 
 import (
        "context"
        "sync"
 
-       "github.com/apache/servicecomb-service-center/datasource/etcd/client"
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
 )
 
-type innerWatcher struct {
+type EventBus struct {
        Cfg    ListWatchConfig
-       lw     ListWatch
-       bus    chan *client.PluginResponse
+       Lw     ListWatch
+       Bus    chan *ListWatchResp
        stopCh chan struct{}
        stop   bool
        mux    sync.Mutex
 }
 
-func (w *innerWatcher) EventBus() <-chan *client.PluginResponse {
-       return w.bus
+func (w *EventBus) ResourceEventBus() <-chan *ListWatchResp {
+       return w.Bus
 }
 
-func (w *innerWatcher) process(_ context.Context) {
+func (w *EventBus) process(_ context.Context) {
        stopCh := make(chan struct{})
        ctx, cancel := context.WithTimeout(w.Cfg.Context, w.Cfg.Timeout)
        gopool.Go(func(_ context.Context) {
                defer close(stopCh)
-               _ = w.lw.DoWatch(ctx, w.sendEvent)
+               _ = w.Lw.DoWatch(ctx, w.sendEvent)
        })
 
        select {
@@ -58,12 +57,12 @@ func (w *innerWatcher) process(_ context.Context) {
 
 }
 
-func (w *innerWatcher) sendEvent(resp *client.PluginResponse) {
+func (w *EventBus) sendEvent(resp *ListWatchResp) {
        defer log.Recover()
-       w.bus <- resp
+       w.Bus <- resp
 }
 
-func (w *innerWatcher) Stop() {
+func (w *EventBus) Stop() {
        w.mux.Lock()
        if w.stop {
                w.mux.Unlock()
@@ -71,15 +70,15 @@ func (w *innerWatcher) Stop() {
        }
        w.stop = true
        close(w.stopCh)
-       close(w.bus)
+       close(w.Bus)
        w.mux.Unlock()
 }
 
-func newInnerWatcher(lw ListWatch, cfg ListWatchConfig) *innerWatcher {
-       w := &innerWatcher{
+func NewEventBus(lw ListWatch, cfg ListWatchConfig) *EventBus {
+       w := &EventBus{
                Cfg:    cfg,
-               lw:     lw,
-               bus:    make(chan *client.PluginResponse, eventBusSize),
+               Lw:     lw,
+               Bus:    make(chan *ListWatchResp, EventBusSize),
                stopCh: make(chan struct{}),
        }
        gopool.Go(w.process)
diff --git a/datasource/etcd/sd/etcd/listwatch.go 
b/datasource/sdcommon/listwatch.go
similarity index 80%
rename from datasource/etcd/sd/etcd/listwatch.go
rename to datasource/sdcommon/listwatch.go
index 8af8e3b..5d39cfd 100644
--- a/datasource/etcd/sd/etcd/listwatch.go
+++ b/datasource/sdcommon/listwatch.go
@@ -15,14 +15,12 @@
  * limitations under the License.
  */
 
-package etcd
+package sdcommon
 
 import (
        "context"
        "fmt"
        "time"
-
-       "github.com/apache/servicecomb-service-center/datasource/etcd/client"
 )
 
 type ListWatchConfig struct {
@@ -35,10 +33,9 @@ func (lo *ListWatchConfig) String() string {
 }
 
 type ListWatch interface {
-       List(op ListWatchConfig) (*client.PluginResponse, error)
+       List(op ListWatchConfig) (*ListWatchResp, error)
        // not support new multiple watchers
-       Watch(op ListWatchConfig) Watcher
-       //
-       DoWatch(ctx context.Context, f func(*client.PluginResponse)) error
-       Revision() int64
+       EventBus(op ListWatchConfig) *EventBus
+
+       DoWatch(ctx context.Context, f func(*ListWatchResp)) error
 }
diff --git a/datasource/sdcommon/types.go b/datasource/sdcommon/types.go
new file mode 100644
index 0000000..93e641b
--- /dev/null
+++ b/datasource/sdcommon/types.go
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sdcommon
+
+import (
+       "strconv"
+)
+
+const (
+       ActionCreate ActionType = iota
+       ActionUpdate
+       ActionDelete
+       // ActionPUT may be create or update
+       ActionPUT
+)
+
+type ActionType int
+
+func (at ActionType) String() string {
+       switch at {
+       case ActionCreate:
+               return "CREATE"
+       case ActionUpdate:
+               return "UPDATE"
+       case ActionDelete:
+               return "DELETE"
+       case ActionPUT:
+               return "PUT"
+       default:
+               return "ACTION" + strconv.Itoa(int(at))
+       }
+}
+
+type ListWatchResp struct {
+       Action ActionType
+       // Revision is only for etcd
+       Revision int64
+       // Resources may be list of Instance or Service
+       Resources []*Resource
+}
+
+type Resource struct {
+       // Key in etcd is prefix, in mongo is resourceId
+       Key string
+       // DocumentID is only for mongo
+       DocumentID string
+
+       // this is only for etcd
+       CreateRevision int64
+       ModRevision    int64
+       Version        int64
+
+       Value interface{}
+}

Reply via email to