little-cui closed pull request #339: SCB-544 Convenient store extension
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/339
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/broker/broker_suite_test.go 
b/server/broker/broker_suite_test.go
new file mode 100644
index 00000000..8bc652e1
--- /dev/null
+++ b/server/broker/broker_suite_test.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package broker
+
+import (
+       pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       _ 
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/quota/buildin"
+       _ 
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/registry/etcd"
+       _ 
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/tracing/buildin"
+       _ 
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/uuid/buildin"
+       "github.com/apache/incubator-servicecomb-service-center/server/service"
+       . "github.com/onsi/ginkgo"
+       "github.com/onsi/ginkgo/reporters"
+       . "github.com/onsi/gomega"
+       "testing"
+)
+
+var serviceResource pb.ServiceCtrlServer
+var instanceResource pb.SerivceInstanceCtrlServerEx
+var brokerResource = BrokerServiceAPI
+
+var _ = BeforeSuite(func() {
+       //init plugin
+       serviceResource, instanceResource = service.AssembleResources()
+})
+
+func TestBroker(t *testing.T) {
+       RegisterFailHandler(Fail)
+       junitReporter := reporters.NewJUnitReporter("model.junit.xml")
+       RunSpecsWithDefaultAndCustomReporters(t, "model Suite", 
[]Reporter{junitReporter})
+}
diff --git a/server/broker/service.go b/server/broker/service.go
index 3ae63c61..f4ea5264 100644
--- a/server/broker/service.go
+++ b/server/broker/service.go
@@ -62,7 +62,7 @@ func (*BrokerService) GetPactsOfProvider(ctx context.Context,
                PactLogger.Errorf(nil, "Get pacts of provider failed: %s\n",
                        resp.Response.Message)
                return &GetProviderConsumerVersionPactResponse{
-                       Response: pb.CreateResponse(scerr.ErrInvalidParams, 
err.Error()),
+                       Response: resp.GetResponse(),
                }, err
        }
 
diff --git a/server/broker/service_test.go b/server/broker/service_test.go
index 516b8c34..ac910a11 100644
--- a/server/broker/service_test.go
+++ b/server/broker/service_test.go
@@ -20,7 +20,6 @@ import (
        "fmt"
 
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
-       "github.com/apache/incubator-servicecomb-service-center/server/core"
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
        . "github.com/onsi/ginkgo"
        . "github.com/onsi/gomega"
@@ -41,8 +40,6 @@ const (
        TEST_BROKER_PROVIDER_APP     = "broker_group_provider"
 )
 
-var brokerResource = BrokerServiceAPI
-var serviceResource = core.ServiceAPI
 var consumerServiceId string
 var providerServiceId string
 
diff --git a/server/broker/store.go b/server/broker/store.go
index 7ed9c027..3bfe9825 100644
--- a/server/broker/store.go
+++ b/server/broker/store.go
@@ -17,166 +17,63 @@
 package broker
 
 import (
-       "sync"
-
-       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
-       sstore 
"github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
-       "golang.org/x/net/context"
+       
"github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
 )
 
-const (
-       PARTICIPANT sstore.StoreType = iota
-       VERSION
-       PACT
-       PACT_VERSION
-       PACT_TAG
-       VERIFICATION
-       PACT_LATEST
-       typeEnd
+var (
+       PARTICIPANT  store.StoreType
+       VERSION      store.StoreType
+       PACT         store.StoreType
+       PACT_VERSION store.StoreType
+       PACT_TAG     store.StoreType
+       VERIFICATION store.StoreType
+       PACT_LATEST  store.StoreType
 )
 
-var TypeNames = []string{
-       PARTICIPANT:  "PARTICIPANT",
-       VERSION:      "VERSION",
-       PACT:         "PACT",
-       PACT_VERSION: "PACT_VERSION",
-       PACT_TAG:     "PACT_TAG",
-       VERIFICATION: "VERIFICATION",
-       PACT_LATEST:  "PACT_LATEST",
-}
-
-var TypeRoots = map[sstore.StoreType]string{
-       PARTICIPANT:  GetBrokerParticipantKey(""),
-       VERSION:      GetBrokerVersionKey(""),
-       PACT:         GetBrokerPactKey(""),
-       PACT_VERSION: GetBrokerPactVersionKey(""),
-       PACT_TAG:     GetBrokerTagKey(""),
-       VERIFICATION: GetBrokerVerificationKey(""),
-       PACT_LATEST:  GetBrokerLatestKey(""),
-}
-
-var store = &BKvStore{}
-
-func Store() *BKvStore {
-       return store
-}
-
-func (s *BKvStore) StoreSize(t sstore.StoreType) int {
-       return 100
-}
-
-func (s *BKvStore) newStore(t sstore.StoreType, opts 
...sstore.KvCacherCfgOption) {
-       opts = append(opts,
-               sstore.WithKey(TypeRoots[t]),
-               sstore.WithInitSize(s.StoreSize(t)),
-       )
-       s.newIndexer(t, sstore.NewKvCacher(t.String(), opts...))
-}
-
-func (s *BKvStore) store(ctx context.Context) {
-       for t := sstore.StoreType(0); t != typeEnd; t++ {
-               s.newStore(t)
-       }
-       for _, i := range s.bindexers {
-               select {
-               case <-ctx.Done():
-                       return
-               case <-i.Ready():
-               }
-       }
-       util.SafeCloseChan(s.bready)
-
-       util.Logger().Debugf("all indexers are ready")
-}
+var brokerKvStore = &BKvStore{}
 
 func init() {
-       store.Initialize()
-       store.Run()
-       store.Ready()
-}
+       PARTICIPANT = store.Store().MustInstall(store.NewEntity("PARTICIPANT", 
GetBrokerParticipantKey("")))
+       VERSION = store.Store().MustInstall(store.NewEntity("VERSION", 
GetBrokerVersionKey("")))
+       PACT = store.Store().MustInstall(store.NewEntity("PACT", 
GetBrokerPactKey("")))
+       PACT_VERSION = 
store.Store().MustInstall(store.NewEntity("PACT_VERSION", 
GetBrokerPactVersionKey("")))
+       PACT_TAG = store.Store().MustInstall(store.NewEntity("PACT_TAG", 
GetBrokerTagKey("")))
+       VERIFICATION = 
store.Store().MustInstall(store.NewEntity("VERIFICATION", 
GetBrokerVerificationKey("")))
+       PACT_LATEST = store.Store().MustInstall(store.NewEntity("PACT_LATEST", 
GetBrokerLatestKey("")))
 
-type BKvStore struct {
-       *sstore.KvStore
-       bindexers map[sstore.StoreType]*sstore.Indexer
-       block     sync.RWMutex
-       bready    chan struct{}
-       bisClose  bool
-}
-
-func (s *BKvStore) Initialize() {
-       s.KvStore = sstore.Store()
-       s.KvStore.Initialize()
-       s.bindexers = make(map[sstore.StoreType]*sstore.Indexer)
-       s.bready = make(chan struct{})
-
-       for i := sstore.StoreType(0); i != typeEnd; i++ {
-               store.newNullStore(i)
-       }
-}
-
-func (s *BKvStore) newNullStore(t sstore.StoreType) {
-       s.newIndexer(t, sstore.NullCacher)
-}
-
-func (s *BKvStore) newIndexer(t sstore.StoreType, cacher sstore.Cacher) {
-       indexer := sstore.NewCacheIndexer(cacher)
-       s.bindexers[t] = indexer
-       indexer.Run()
 }
 
-func (s *BKvStore) Run() {
-       util.Go(func(ctx context.Context) {
-               s.store(ctx)
-               select {
-               case <-ctx.Done():
-                       s.Stop()
-               }
-       })
-}
-
-func (s *BKvStore) Ready() <-chan struct{} {
-       return s.bready
+type BKvStore struct {
 }
 
-func (s *BKvStore) Participant() *sstore.Indexer {
-       return s.bindexers[PARTICIPANT]
+func (s *BKvStore) Participant() *store.Indexer {
+       return store.Store().Entity(PARTICIPANT)
 }
 
-func (s *BKvStore) Version() *sstore.Indexer {
-       return s.bindexers[VERSION]
+func (s *BKvStore) Version() *store.Indexer {
+       return store.Store().Entity(VERSION)
 }
 
-func (s *BKvStore) Pact() *sstore.Indexer {
-       return s.bindexers[PACT]
+func (s *BKvStore) Pact() *store.Indexer {
+       return store.Store().Entity(PACT)
 }
 
-func (s *BKvStore) PactVersion() *sstore.Indexer {
-       return s.bindexers[PACT_VERSION]
+func (s *BKvStore) PactVersion() *store.Indexer {
+       return store.Store().Entity(PACT_VERSION)
 }
 
-func (s *BKvStore) PactTag() *sstore.Indexer {
-       return s.bindexers[PACT_TAG]
+func (s *BKvStore) PactTag() *store.Indexer {
+       return store.Store().Entity(PACT_TAG)
 }
 
-func (s *BKvStore) Verification() *sstore.Indexer {
-       return s.bindexers[VERIFICATION]
+func (s *BKvStore) Verification() *store.Indexer {
+       return store.Store().Entity(VERIFICATION)
 }
 
-func (s *BKvStore) PactLatest() *sstore.Indexer {
-       return s.bindexers[PACT_LATEST]
+func (s *BKvStore) PactLatest() *store.Indexer {
+       return store.Store().Entity(PACT_LATEST)
 }
 
-func (s *BKvStore) Stop() {
-       if s.bisClose {
-               return
-       }
-       s.bisClose = true
-
-       for _, i := range s.bindexers {
-               i.Stop()
-       }
-
-       util.SafeCloseChan(s.bready)
-
-       util.Logger().Debugf("broker store daemon stopped")
+func Store() *BKvStore {
+       return brokerKvStore
 }
diff --git a/server/core/backend/store/cache_kv.go 
b/server/core/backend/store/cache_kv.go
index fcced8f2..9fb347eb 100644
--- a/server/core/backend/store/cache_kv.go
+++ b/server/core/backend/store/cache_kv.go
@@ -95,7 +95,7 @@ func (c *KvCache) compact() {
        c.store = newCache
 
        util.Logger().Infof("cache %s is not in use over %s, compact capacity 
to size %d->%d",
-               c.owner.Cfg.Key, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size)
+               c.owner.Cfg.Prefix, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, 
c.size)
 
 }
 
@@ -107,7 +107,7 @@ func (c *KvCache) Size() (l int) {
 }
 
 type KvCacher struct {
-       Cfg KvCacherCfg
+       Cfg Config
 
        name         string
        lastRev      int64
@@ -143,13 +143,13 @@ func (c *KvCacher) needList() bool {
        }
 
        util.Logger().Debugf("no events come in more then %s, need to list key 
%s, rev: %d",
-               time.Duration(c.noEventCount)*c.Cfg.Timeout, c.Cfg.Key, rev)
+               time.Duration(c.noEventCount)*c.Cfg.Timeout, c.Cfg.Prefix, rev)
        c.noEventCount = 0
        return true
 }
 
-func (c *KvCacher) doList(listOps ListOptions) error {
-       kvs, err := c.lw.List(listOps)
+func (c *KvCacher) doList(cfg ListWatchConfig) error {
+       kvs, err := c.lw.List(cfg)
        if err != nil {
                return err
        }
@@ -161,13 +161,13 @@ func (c *KvCacher) doList(listOps ListOptions) error {
                util.Logger().Warnf(nil, "most of the protected data(%d/%d) are 
recovered", kc, c.cache.Size())
        }
        c.sync(evts)
-       util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: 
%d", c.Cfg.Key, len(kvs), c.lw.Revision())
+       util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: 
%d", c.Cfg.Prefix, len(kvs), c.lw.Revision())
 
        return nil
 }
 
-func (c *KvCacher) doWatch(listOps ListOptions) error {
-       watcher := c.lw.Watch(listOps)
+func (c *KvCacher) doWatch(cfg ListWatchConfig) error {
+       watcher := c.lw.Watch(cfg)
        return c.handleWatcher(watcher)
 }
 
@@ -175,19 +175,19 @@ func (c *KvCacher) ListAndWatch(ctx context.Context) 
error {
        c.mux.Lock()
        defer c.mux.Unlock()
 
-       listOps := ListOptions{
+       cfg := ListWatchConfig{
                Timeout: c.Cfg.Timeout,
                Context: ctx,
        }
        if c.needList() {
-               if err := c.doList(listOps); err != nil {
+               if err := c.doList(cfg); err != nil {
                        return err
                }
        }
 
        util.SafeCloseChan(c.ready)
 
-       return c.doWatch(listOps)
+       return c.doWatch(cfg)
 }
 
 func (c *KvCacher) handleWatcher(watcher *Watcher) error {
@@ -336,7 +336,7 @@ func (c *KvCacher) filterDelete(store 
map[string]*mvccpb.KeyValue, newStore map[
                block[i] = KvEvent{
                        Revision: rev,
                        Type:     proto.EVT_DELETE,
-                       Prefix:   c.Cfg.Key,
+                       Prefix:   c.Cfg.Prefix,
                        Object:   v,
                }
                i++
@@ -365,7 +365,7 @@ func (c *KvCacher) filterCreateOrUpdate(store 
map[string]*mvccpb.KeyValue, newSt
                        block[i] = KvEvent{
                                Revision: rev,
                                Type:     proto.EVT_CREATE,
-                               Prefix:   c.Cfg.Key,
+                               Prefix:   c.Cfg.Prefix,
                                Object:   v,
                        }
                        i++
@@ -385,7 +385,7 @@ func (c *KvCacher) filterCreateOrUpdate(store 
map[string]*mvccpb.KeyValue, newSt
                block[i] = KvEvent{
                        Revision: rev,
                        Type:     proto.EVT_UPDATE,
-                       Prefix:   c.Cfg.Key,
+                       Prefix:   c.Cfg.Prefix,
                        Object:   v,
                }
                i++
@@ -492,8 +492,8 @@ func NewKvCache(c *KvCacher, size int) *KvCache {
        }
 }
 
-func NewKvCacher(name string, opts ...KvCacherCfgOption) *KvCacher {
-       cfg := DefaultKvCacherConfig()
+func NewKvCacher(name string, opts ...ConfigOption) *KvCacher {
+       cfg := DefaultConfig()
        for _, opt := range opts {
                opt(&cfg)
        }
@@ -504,7 +504,7 @@ func NewKvCacher(name string, opts ...KvCacherCfgOption) 
*KvCacher {
                ready: make(chan struct{}),
                lw: ListWatcher{
                        Client: backend.Registry(),
-                       Key:    cfg.Key,
+                       Prefix: cfg.Prefix,
                },
                goroutine: util.NewGo(context.Background()),
        }
diff --git a/server/core/backend/store/cacher.go 
b/server/core/backend/store/cacher.go
index 94fbe46c..898964c8 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -24,6 +24,7 @@ type Cache interface {
 }
 
 type Cacher interface {
+       // Name is the cache size metric name
        Name() string
        Cache() Cache
        Run()
diff --git a/server/core/backend/store/common.go 
b/server/core/backend/store/common.go
index fc04898e..0de9e05c 100644
--- a/server/core/backend/store/common.go
+++ b/server/core/backend/store/common.go
@@ -25,12 +25,17 @@ import (
 type StoreType int
 
 func (st StoreType) String() string {
+       if int(st) < 0 {
+               return "NONEXIST"
+       }
        if int(st) < len(TypeNames) {
                return TypeNames[st]
        }
        return "TYPE" + strconv.Itoa(int(st))
 }
 
+const NONEXIST = StoreType(-1)
+
 const (
        DOMAIN StoreType = iota
        PROJECT
@@ -48,25 +53,25 @@ const (
        INSTANCE
        LEASE
        ENDPOINTS
-       typeEnd
+       typeEnd // end of the base store types
 )
 
 var TypeNames = []string{
-       SERVICE:          "SERVICE",
-       INSTANCE:         "INSTANCE",
        DOMAIN:           "DOMAIN",
-       SCHEMA:           "SCHEMA",
-       SCHEMA_SUMMARY:   "SCHEMA_SUMMARY",
-       RULE:             "RULE",
-       LEASE:            "LEASE",
+       PROJECT:          "PROJECT",
+       SERVICE:          "SERVICE",
        SERVICE_INDEX:    "SERVICE_INDEX",
        SERVICE_ALIAS:    "SERVICE_ALIAS",
        SERVICE_TAG:      "SERVICE_TAG",
+       RULE:             "RULE",
        RULE_INDEX:       "RULE_INDEX",
        DEPENDENCY:       "DEPENDENCY",
        DEPENDENCY_RULE:  "DEPENDENCY_RULE",
        DEPENDENCY_QUEUE: "DEPENDENCY_QUEUE",
-       PROJECT:          "PROJECT",
+       SCHEMA:           "SCHEMA",
+       SCHEMA_SUMMARY:   "SCHEMA_SUMMARY",
+       INSTANCE:         "INSTANCE",
+       LEASE:            "LEASE",
        ENDPOINTS:        "ENDPOINTS",
 }
 
diff --git a/server/core/backend/store/opt.go 
b/server/core/backend/store/config.go
similarity index 55%
rename from server/core/backend/store/opt.go
rename to server/core/backend/store/config.go
index 59fbabb0..9f0b2dde 100644
--- a/server/core/backend/store/opt.go
+++ b/server/core/backend/store/config.go
@@ -18,11 +18,12 @@ package store
 
 import (
        "fmt"
+       "golang.org/x/net/context"
        "time"
 )
 
-type KvCacherCfg struct {
-       Key                string
+type Config struct {
+       Prefix             string
        InitSize           int
        NoEventMaxInterval int
        Timeout            time.Duration
@@ -31,43 +32,52 @@ type KvCacherCfg struct {
        DeferHandler       DeferHandler
 }
 
-func (cfg KvCacherCfg) String() string {
-       return fmt.Sprintf("{key: %s, timeout: %s, period: %s}",
-               cfg.Key, cfg.Timeout, cfg.Period)
+func (cfg Config) String() string {
+       return fmt.Sprintf("{prefix: %s, timeout: %s, period: %s}",
+               cfg.Prefix, cfg.Timeout, cfg.Period)
 }
 
-type KvCacherCfgOption func(*KvCacherCfg)
+type ConfigOption func(*Config)
 
-func WithKey(key string) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.Key = key }
+func WithPrefix(key string) ConfigOption {
+       return func(cfg *Config) { cfg.Prefix = key }
 }
 
-func WithInitSize(size int) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.InitSize = size }
+func WithInitSize(size int) ConfigOption {
+       return func(cfg *Config) { cfg.InitSize = size }
 }
 
-func WithTimeout(ot time.Duration) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.Timeout = ot }
+func WithTimeout(ot time.Duration) ConfigOption {
+       return func(cfg *Config) { cfg.Timeout = ot }
 }
 
-func WithPeriod(ot time.Duration) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.Period = ot }
+func WithPeriod(ot time.Duration) ConfigOption {
+       return func(cfg *Config) { cfg.Period = ot }
 }
 
-func WithEventFunc(f KvEventFunc) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.OnEvent = f }
+func WithEventFunc(f KvEventFunc) ConfigOption {
+       return func(cfg *Config) { cfg.OnEvent = f }
 }
 
-func WithDeferHandler(h DeferHandler) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.DeferHandler = h }
+func WithDeferHandler(h DeferHandler) ConfigOption {
+       return func(cfg *Config) { cfg.DeferHandler = h }
 }
 
-func DefaultKvCacherConfig() KvCacherCfg {
-       return KvCacherCfg{
-               Key:                "/",
+func DefaultConfig() Config {
+       return Config{
+               Prefix:             "/",
                Timeout:            DEFAULT_LISTWATCH_TIMEOUT,
                Period:             time.Second,
                NoEventMaxInterval: DEFAULT_MAX_NO_EVENT_INTERVAL,
                InitSize:           DEFAULT_CACHE_INIT_SIZE,
        }
 }
+
+type ListWatchConfig struct {
+       Timeout time.Duration
+       Context context.Context
+}
+
+func (lo *ListWatchConfig) String() string {
+       return fmt.Sprintf("{timeout: %s}", lo.Timeout)
+}
diff --git a/server/core/backend/store/event.go 
b/server/core/backend/store/event.go
index 504aba10..67714a04 100644
--- a/server/core/backend/store/event.go
+++ b/server/core/backend/store/event.go
@@ -18,22 +18,8 @@ package store
 
 import (
        
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
-       "sync"
 )
 
-var (
-       evtProxies map[StoreType]*KvEventProxy
-)
-
-func init() {
-       evtProxies = make(map[StoreType]*KvEventProxy, typeEnd)
-       for i := StoreType(0); i != typeEnd; i++ {
-               evtProxies[i] = &KvEventProxy{
-                       evtHandleFuncs: make([]KvEventFunc, 0, 5),
-               }
-       }
-}
-
 type KvEventFunc func(evt KvEvent)
 
 type KvEvent struct {
@@ -48,29 +34,6 @@ type KvEventHandler interface {
        OnEvent(evt KvEvent)
 }
 
-type KvEventProxy struct {
-       evtHandleFuncs []KvEventFunc
-       lock           sync.RWMutex
-}
-
-func (h *KvEventProxy) AddHandleFunc(f KvEventFunc) {
-       h.lock.Lock()
-       h.evtHandleFuncs = append(h.evtHandleFuncs, f)
-       h.lock.Unlock()
-}
-
-func (h *KvEventProxy) OnEvent(evt KvEvent) {
-       h.lock.RLock()
-       for _, f := range h.evtHandleFuncs {
-               f(evt)
-       }
-       h.lock.RUnlock()
-}
-
-func EventProxy(t StoreType) *KvEventProxy {
-       return evtProxies[t]
-}
-
 // the event handler/func must be good performance, or will block the event 
bus.
 func AddEventHandleFunc(t StoreType, f KvEventFunc) {
        EventProxy(t).AddHandleFunc(f)
diff --git a/server/core/backend/store/event_proxy.go 
b/server/core/backend/store/event_proxy.go
new file mode 100644
index 00000000..e66583d9
--- /dev/null
+++ b/server/core/backend/store/event_proxy.go
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import "sync"
+
+var (
+       EventProxies map[StoreType]*KvEventProxy
+)
+
+func init() {
+       EventProxies = make(map[StoreType]*KvEventProxy, typeEnd)
+       for i := StoreType(0); i != typeEnd; i++ {
+               EventProxies[i] = NewEventProxy()
+       }
+}
+
+type KvEventProxy struct {
+       evtHandleFuncs []KvEventFunc
+       lock           sync.RWMutex
+}
+
+func (h *KvEventProxy) AddHandleFunc(f KvEventFunc) {
+       h.lock.Lock()
+       h.evtHandleFuncs = append(h.evtHandleFuncs, f)
+       h.lock.Unlock()
+}
+
+func (h *KvEventProxy) OnEvent(evt KvEvent) {
+       h.lock.RLock()
+       for _, f := range h.evtHandleFuncs {
+               f(evt)
+       }
+       h.lock.RUnlock()
+}
+
+func NewEventProxy() *KvEventProxy {
+       return &KvEventProxy{
+               evtHandleFuncs: make([]KvEventFunc, 0, 5),
+       }
+}
+
+func EventProxy(t StoreType) *KvEventProxy {
+       return EventProxies[t]
+}
diff --git a/server/core/backend/store/extend.go 
b/server/core/backend/store/extend.go
new file mode 100644
index 00000000..bffd0c15
--- /dev/null
+++ b/server/core/backend/store/extend.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import (
+       "errors"
+       "fmt"
+)
+
+type Entity interface {
+       Name() string
+       Prefix() string
+       InitSize() int
+}
+
+type entity struct {
+       name     string
+       prefix   string
+       initSize int
+}
+
+func (e *entity) Name() string {
+       return e.name
+}
+
+func (e *entity) Prefix() string {
+       return e.prefix
+}
+
+func (e *entity) InitSize() int {
+       return e.initSize
+}
+
+func InstallType(e Entity) (id StoreType, err error) {
+       if e == nil {
+               return NONEXIST, errors.New("invalid parameter")
+       }
+       for _, n := range TypeNames {
+               if n == e.Name() {
+                       return NONEXIST, fmt.Errorf("redeclare store type 
'%s'", n)
+               }
+       }
+       for _, r := range TypeRoots {
+               if r == e.Prefix() {
+                       return NONEXIST, fmt.Errorf("redeclare store root 
'%s'", r)
+               }
+       }
+
+       TypeNames = append(TypeNames, e.Name())
+       id = StoreType(len(TypeNames) + 1) // +1 for typeEnd
+
+       TypeRoots[id] = e.Prefix()
+       TypeInitSize[id] = e.InitSize()
+
+       EventProxies[id] = NewEventProxy()
+       return
+}
+
+func NewEntity(name, prefix string, opts ...ConfigOption) Entity {
+       cfg := DefaultConfig()
+       for _, opt := range opts {
+               opt(&cfg)
+       }
+       cfg.Prefix = prefix
+       return &entity{
+               name:     name,
+               prefix:   cfg.Prefix,
+               initSize: cfg.InitSize,
+       }
+}
diff --git a/server/core/backend/store/extend_test.go 
b/server/core/backend/store/extend_test.go
new file mode 100644
index 00000000..8359dd2e
--- /dev/null
+++ b/server/core/backend/store/extend_test.go
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import "testing"
+
+type extend struct {
+}
+
+func (e *extend) Name() string {
+       return "test"
+}
+
+func (e *extend) Prefix() string {
+       return "/test"
+}
+
+func (e *extend) InitSize() int {
+       return 0
+}
+
+func TestInstallType(t *testing.T) {
+       id, err := InstallType(&extend{})
+       if err != nil {
+               t.Fatal(err)
+       }
+       if id == NONEXIST {
+               t.Fatal(err)
+       }
+
+       id, err = InstallType(&extend{})
+       if id != NONEXIST || err == nil {
+               t.Fatal("InstallType fail", err)
+       }
+}
diff --git a/server/core/backend/store/indexer.go 
b/server/core/backend/store/indexer.go
index 3f8cd7d7..2b35a976 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -28,23 +28,16 @@ import (
        "time"
 )
 
-var defaultRootKeys map[string]struct{}
-
-func init() {
-       defaultRootKeys = make(map[string]struct{}, len(defaultRootKeys))
-       for _, root := range TypeRoots {
-               defaultRootKeys[root] = struct{}{}
-       }
-}
-
 type Indexer struct {
-       BuildTimeout     time.Duration
+       BuildTimeout time.Duration
+       Root         string
+
        cacher           Cacher
-       prefixIndex      map[string]map[string]struct{}
-       prefixLock       sync.RWMutex
-       prefixBuildQueue chan KvEvent
        goroutine        *util.GoRoutine
        ready            chan struct{}
+       prefixIndex      map[string]map[string]struct{}
+       prefixBuildQueue chan KvEvent
+       prefixLock       sync.RWMutex
        isClose          bool
 }
 
@@ -266,8 +259,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix 
string) (count int) {
 }
 
 func (i *Indexer) addPrefixKey(prefix, key string) {
-       _, ok := defaultRootKeys[key]
-       if ok {
+       if i.Root == key {
                return
        }
 
@@ -342,9 +334,10 @@ func (i *Indexer) Ready() <-chan struct{} {
        return i.ready
 }
 
-func NewCacheIndexer(cr Cacher) *Indexer {
+func NewCacheIndexer(root string, cr Cacher) *Indexer {
        return &Indexer{
                BuildTimeout:     DEFAULT_ADD_QUEUE_TIMEOUT,
+               Root:             root,
                cacher:           cr,
                prefixIndex:      make(map[string]map[string]struct{}, 
DEFAULT_CACHE_INIT_SIZE),
                prefixBuildQueue: make(chan KvEvent, DEFAULT_MAX_EVENT_COUNT),
diff --git a/server/core/backend/store/listwatch.go 
b/server/core/backend/store/listwatch.go
index 3cf1fe75..42fbde52 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -24,30 +24,20 @@ import (
        "github.com/coreos/etcd/mvcc/mvccpb"
        "golang.org/x/net/context"
        "sync"
-       "time"
 )
 
-type ListOptions struct {
-       Timeout time.Duration
-       Context context.Context
-}
-
-func (lo *ListOptions) String() string {
-       return fmt.Sprintf("{timeout: %s}", lo.Timeout)
-}
-
 type ListWatcher struct {
        Client registry.Registry
-       Key    string
+       Prefix string
 
        rev int64
 }
 
-func (lw *ListWatcher) List(op ListOptions) ([]*mvccpb.KeyValue, error) {
+func (lw *ListWatcher) List(op ListWatchConfig) ([]*mvccpb.KeyValue, error) {
        otCtx, _ := context.WithTimeout(op.Context, op.Timeout)
-       resp, err := lw.Client.Do(otCtx, 
registry.WatchPrefixOpOptions(lw.Key)...)
+       resp, err := lw.Client.Do(otCtx, 
registry.WatchPrefixOpOptions(lw.Prefix)...)
        if err != nil {
-               util.Logger().Errorf(err, "list key %s failed, rev: %d->0", 
lw.Key, lw.Revision())
+               util.Logger().Errorf(err, "list prefix %s failed, rev: %d->0", 
lw.Prefix, lw.Revision())
                lw.setRevision(0)
                return nil, err
        }
@@ -66,14 +56,14 @@ func (lw *ListWatcher) setRevision(rev int64) {
        lw.rev = rev
 }
 
-func (lw *ListWatcher) Watch(op ListOptions) *Watcher {
+func (lw *ListWatcher) Watch(op ListWatchConfig) *Watcher {
        return newWatcher(lw, op)
 }
 
 func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []KvEvent)) 
error {
        rev := lw.Revision()
        opts := append(
-               registry.WatchPrefixOpOptions(lw.Key),
+               registry.WatchPrefixOpOptions(lw.Prefix),
                registry.WithRev(rev+1),
                registry.WithWatchCallback(
                        func(message string, resp *registry.PluginResponse) 
error {
@@ -81,13 +71,13 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f 
func(evt []KvEvent)) error
                                        return fmt.Errorf("unknown event %s", 
resp)
                                }
 
-                               util.Logger().Infof("watch prefix key %s, start 
rev %d+1, event: %s", lw.Key, rev, resp)
+                               util.Logger().Infof("watch prefix %s, start rev 
%d+1, event: %s", lw.Prefix, rev, resp)
 
                                lw.setRevision(resp.Revision)
 
                                evts := make([]KvEvent, len(resp.Kvs))
                                for i, kv := range resp.Kvs {
-                                       evt := KvEvent{Prefix: lw.Key, 
Revision: kv.ModRevision}
+                                       evt := KvEvent{Prefix: lw.Prefix, 
Revision: kv.ModRevision}
                                        switch {
                                        case resp.Action == registry.Put && 
kv.Version == 1:
                                                evt.Type, evt.Object = 
proto.EVT_CREATE, kv
@@ -106,16 +96,16 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f 
func(evt []KvEvent)) error
 
        err := lw.Client.Watch(ctx, opts...)
        if err != nil { // compact可能会导致watch失败 or message body size lager than 
4MB
-               util.Logger().Errorf(err, "watch key %s failed, start rev: 
%d+1->%d->0", lw.Key, rev, lw.Revision())
+               util.Logger().Errorf(err, "watch prefix %s failed, start rev: 
%d+1->%d->0", lw.Prefix, rev, lw.Revision())
 
                lw.setRevision(0)
-               f([]KvEvent{errEvent(lw.Key, err)})
+               f([]KvEvent{errEvent(lw.Prefix, err)})
        }
        return err
 }
 
 type Watcher struct {
-       ListOps ListOptions
+       ListOps ListWatchConfig
        lw      *ListWatcher
        bus     chan []KvEvent
        stopCh  chan struct{}
@@ -169,7 +159,7 @@ func errEvent(key string, err error) KvEvent {
        }
 }
 
-func newWatcher(lw *ListWatcher, listOps ListOptions) *Watcher {
+func newWatcher(lw *ListWatcher, listOps ListWatchConfig) *Watcher {
        w := &Watcher{
                ListOps: listOps,
                lw:      lw,
diff --git a/server/core/backend/store/store.go 
b/server/core/backend/store/store.go
index e549cd41..66849464 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -61,7 +61,7 @@ func (s *KvStore) dispatchEvent(t StoreType, evt KvEvent) {
 }
 
 func (s *KvStore) newIndexBuilder(t StoreType, cacher Cacher) {
-       indexer := NewCacheIndexer(cacher)
+       indexer := NewCacheIndexer(TypeRoots[t], cacher)
        s.indexers[t] = indexer
        indexer.Run()
 }
@@ -71,7 +71,7 @@ func (s *KvStore) Run() {
        s.taskService.Run()
 }
 
-func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts 
[]KvCacherCfgOption) {
+func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts []ConfigOption) {
        switch t {
        case INSTANCE:
                opts = append(opts, 
WithDeferHandler(s.SelfPreservationHandler()))
@@ -81,7 +81,7 @@ func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts 
[]KvCacherCfgOption)
                opts = append(opts, WithInitSize(sz))
        }
        opts = append(opts,
-               WithKey(TypeRoots[t]),
+               WithPrefix(TypeRoots[t]),
                WithEventFunc(func(evt KvEvent) { s.dispatchEvent(t, evt) }))
        return
 }
@@ -246,6 +246,33 @@ func (s *KvStore) KeepAlive(ctx context.Context, opts 
...registry.PluginOpOption
        return pt.TTL, pt.Err()
 }
 
+func (s *KvStore) Entity(id StoreType) *Indexer {
+       return s.indexers[id]
+}
+
+func (s *KvStore) Install(e Entity) (id StoreType, err error) {
+       if id, err = InstallType(e); err != nil {
+               return
+       }
+
+       util.Logger().Infof("install new store entity %d:%s->%s", id, e.Name(), 
e.Prefix())
+
+       if !core.ServerInfo.Config.EnableCache {
+               s.newIndexBuilder(id, NullCacher)
+               return
+       }
+       s.newIndexBuilder(id, NewKvCacher(id.String(), 
s.getKvCacherCfgOptions(id)...))
+       return
+}
+
+func (s *KvStore) MustInstall(e Entity) StoreType {
+       id, err := s.Install(e)
+       if err != nil {
+               panic(err)
+       }
+       return id
+}
+
 func Store() *KvStore {
        return store
 }
diff --git a/server/error/error.go b/server/error/error.go
index 5ee5bf12..b30356ee 100644
--- a/server/error/error.go
+++ b/server/error/error.go
@@ -18,8 +18,6 @@ package error
 
 import (
        "encoding/json"
-       "fmt"
-       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        "net/http"
 )
 
@@ -104,9 +102,9 @@ func (e Error) Error() string {
        return e.Message + "(" + e.Detail + ")"
 }
 
-func (e Error) toJson() string {
+func (e Error) Marshal() []byte {
        bs, _ := json.Marshal(e)
-       return util.BytesToStringWithNoCopy(bs)
+       return bs
 }
 
 func (e Error) StatusCode() int {
@@ -123,14 +121,6 @@ func (e Error) InternalError() bool {
        return false
 }
 
-func (e Error) HttpWrite(w http.ResponseWriter) {
-       status := e.StatusCode()
-       w.Header().Add("X-Response-Status", fmt.Sprint(status))
-       w.Header().Set("Content-Type", "application/json; charset=UTF-8")
-       w.WriteHeader(status)
-       fmt.Fprintln(w, e.toJson())
-}
-
 func NewError(code int32, detail string) *Error {
        return &Error{
                Code:    code,
diff --git a/server/rest/controller/rest_util.go 
b/server/rest/controller/rest_util.go
index be626d5b..00f93f7e 100644
--- a/server/rest/controller/rest_util.go
+++ b/server/rest/controller/rest_util.go
@@ -25,15 +25,23 @@ import (
        "net/http"
 )
 
+const (
+       contentTypeJson = "application/json; charset=UTF-8"
+       contentTypeText = "text/plain; charset=UTF-8"
+)
+
 func WriteError(w http.ResponseWriter, code int32, detail string) {
        err := error.NewError(code, detail)
-       err.HttpWrite(w)
+       w.Header().Add("X-Response-Status", fmt.Sprint(err.StatusCode()))
+       w.Header().Set("Content-Type", contentTypeJson)
+       w.WriteHeader(err.StatusCode())
+       fmt.Fprintln(w, util.BytesToStringWithNoCopy(err.Marshal()))
 }
 
 func WriteJsonObject(w http.ResponseWriter, obj interface{}) {
        if obj == nil {
                w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK))
-               w.Header().Set("Content-Type", "text/plain; charset=UTF-8")
+               w.Header().Set("Content-Type", contentTypeText)
                w.WriteHeader(http.StatusOK)
                return
        }
@@ -44,7 +52,7 @@ func WriteJsonObject(w http.ResponseWriter, obj interface{}) {
                return
        }
        w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK))
-       w.Header().Set("Content-Type", "application/json; charset=UTF-8")
+       w.Header().Set("Content-Type", contentTypeJson)
        w.WriteHeader(http.StatusOK)
        fmt.Fprintln(w, util.BytesToStringWithNoCopy(objJson))
 }
@@ -61,11 +69,10 @@ func WriteResponse(w http.ResponseWriter, resp 
*pb.Response, obj interface{}) {
 func WriteBytes(w http.ResponseWriter, resp *pb.Response, json []byte) {
        if resp.GetCode() == pb.Response_SUCCESS {
                w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK))
-               w.Header().Set("Content-Type", "application/json; 
charset=UTF-8")
+               w.Header().Set("Content-Type", contentTypeJson)
                w.WriteHeader(http.StatusOK)
                w.Write(json)
                return
        }
        WriteError(w, resp.GetCode(), resp.GetMessage())
 }
-
diff --git a/server/rest/controller/v3/main_controller.go 
b/server/rest/controller/v3/main_controller.go
index 5bb44100..e20e3238 100644
--- a/server/rest/controller/v3/main_controller.go
+++ b/server/rest/controller/v3/main_controller.go
@@ -19,13 +19,29 @@ package v3
 import (
        "encoding/json"
        "github.com/apache/incubator-servicecomb-service-center/pkg/rest"
+       pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       
"github.com/apache/incubator-servicecomb-service-center/server/rest/controller"
        
"github.com/apache/incubator-servicecomb-service-center/server/rest/controller/v4"
        "github.com/apache/incubator-servicecomb-service-center/version"
        "net/http"
 )
 
+var (
+       versionJsonCache []byte
+       versionResp      *pb.Response
+)
+
 const API_VERSION = "3.0.0"
 
+func init() {
+       result := v4.Result{
+               VersionSet: version.Ver(),
+               ApiVersion: API_VERSION,
+       }
+       versionJsonCache, _ = json.Marshal(result)
+       versionResp = pb.CreateResponse(pb.Response_SUCCESS, "get version 
successfully")
+}
+
 type MainService struct {
        v4.MainService
 }
@@ -38,11 +54,5 @@ func (this *MainService) URLPatterns() []rest.Route {
 }
 
 func (this *MainService) GetVersion(w http.ResponseWriter, r *http.Request) {
-       result := v4.Result{
-               VersionSet: version.Ver(),
-               ApiVersion: API_VERSION,
-       }
-       resultJSON, _ := json.Marshal(result)
-       w.Header().Set("Content-Type", "application/json;charset=utf-8")
-       w.Write(resultJSON)
+       controller.WriteBytes(w, versionResp, versionJsonCache)
 }
diff --git a/server/rest/controller/v4/main_controller.go 
b/server/rest/controller/v4/main_controller.go
index d2af4d4e..cbf5ed58 100644
--- a/server/rest/controller/v4/main_controller.go
+++ b/server/rest/controller/v4/main_controller.go
@@ -28,7 +28,10 @@ import (
        "net/http"
 )
 
-var resultJSON []byte
+var (
+       versionJsonCache []byte
+       versionResp      *pb.Response
+)
 
 const API_VERSION = "4.0.0"
 
@@ -48,7 +51,8 @@ func init() {
                API_VERSION,
                core.ServerInfo.Config,
        }
-       resultJSON, _ = json.Marshal(result)
+       versionJsonCache, _ = json.Marshal(result)
+       versionResp = pb.CreateResponse(pb.Response_SUCCESS, "get version 
successfully")
 }
 
 func (this *MainService) URLPatterns() []rest.Route {
@@ -72,6 +76,5 @@ func (this *MainService) ClusterHealth(w http.ResponseWriter, 
r *http.Request) {
 }
 
 func (this *MainService) GetVersion(w http.ResponseWriter, r *http.Request) {
-       w.Header().Set("Content-Type", "application/json;charset=utf-8")
-       w.Write(resultJSON)
+       controller.WriteBytes(w, versionResp, versionJsonCache)
 }
diff --git a/server/service/schema.go b/server/service/schema.go
index 3d7da42f..dd9c4d9c 100644
--- a/server/service/schema.go
+++ b/server/service/schema.go
@@ -122,7 +122,6 @@ func (s *MicroServiceService) GetAllSchemaInfo(ctx 
context.Context, in *pb.GetAl
 
        schemasList := service.Schemas
        if schemasList == nil || len(schemasList) == 0 {
-               util.Logger().Infof("service %s schemaId set is empty.", 
in.ServiceId)
                return &pb.GetAllSchemaResponse{
                        Response: pb.CreateResponse(pb.Response_SUCCESS, "Do 
not have this schema info."),
                        Schemas:  []*pb.Schema{},


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to