little-cui closed pull request #339: SCB-544 Convenient store extension URL: https://github.com/apache/incubator-servicecomb-service-center/pull/339
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/server/broker/broker_suite_test.go b/server/broker/broker_suite_test.go new file mode 100644 index 00000000..8bc652e1 --- /dev/null +++ b/server/broker/broker_suite_test.go @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package broker + +import ( + pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" + _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/quota/buildin" + _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/registry/etcd" + _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/tracing/buildin" + _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/uuid/buildin" + "github.com/apache/incubator-servicecomb-service-center/server/service" + . "github.com/onsi/ginkgo" + "github.com/onsi/ginkgo/reporters" + . "github.com/onsi/gomega" + "testing" +) + +var serviceResource pb.ServiceCtrlServer +var instanceResource pb.SerivceInstanceCtrlServerEx +var brokerResource = BrokerServiceAPI + +var _ = BeforeSuite(func() { + //init plugin + serviceResource, instanceResource = service.AssembleResources() +}) + +func TestBroker(t *testing.T) { + RegisterFailHandler(Fail) + junitReporter := reporters.NewJUnitReporter("model.junit.xml") + RunSpecsWithDefaultAndCustomReporters(t, "model Suite", []Reporter{junitReporter}) +} diff --git a/server/broker/service.go b/server/broker/service.go index 3ae63c61..f4ea5264 100644 --- a/server/broker/service.go +++ b/server/broker/service.go @@ -62,7 +62,7 @@ func (*BrokerService) GetPactsOfProvider(ctx context.Context, PactLogger.Errorf(nil, "Get pacts of provider failed: %s\n", resp.Response.Message) return &GetProviderConsumerVersionPactResponse{ - Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()), + Response: resp.GetResponse(), }, err } diff --git a/server/broker/service_test.go b/server/broker/service_test.go index 516b8c34..ac910a11 100644 --- a/server/broker/service_test.go +++ b/server/broker/service_test.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/apache/incubator-servicecomb-service-center/pkg/util" - "github.com/apache/incubator-servicecomb-service-center/server/core" pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -41,8 +40,6 @@ const ( TEST_BROKER_PROVIDER_APP = "broker_group_provider" ) -var brokerResource = BrokerServiceAPI -var serviceResource = core.ServiceAPI var consumerServiceId string var providerServiceId string diff --git a/server/broker/store.go b/server/broker/store.go index 7ed9c027..3bfe9825 100644 --- a/server/broker/store.go +++ b/server/broker/store.go @@ -17,166 +17,63 @@ package broker import ( - "sync" - - "github.com/apache/incubator-servicecomb-service-center/pkg/util" - sstore "github.com/apache/incubator-servicecomb-service-center/server/core/backend/store" - "golang.org/x/net/context" + "github.com/apache/incubator-servicecomb-service-center/server/core/backend/store" ) -const ( - PARTICIPANT sstore.StoreType = iota - VERSION - PACT - PACT_VERSION - PACT_TAG - VERIFICATION - PACT_LATEST - typeEnd +var ( + PARTICIPANT store.StoreType + VERSION store.StoreType + PACT store.StoreType + PACT_VERSION store.StoreType + PACT_TAG store.StoreType + VERIFICATION store.StoreType + PACT_LATEST store.StoreType ) -var TypeNames = []string{ - PARTICIPANT: "PARTICIPANT", - VERSION: "VERSION", - PACT: "PACT", - PACT_VERSION: "PACT_VERSION", - PACT_TAG: "PACT_TAG", - VERIFICATION: "VERIFICATION", - PACT_LATEST: "PACT_LATEST", -} - -var TypeRoots = map[sstore.StoreType]string{ - PARTICIPANT: GetBrokerParticipantKey(""), - VERSION: GetBrokerVersionKey(""), - PACT: GetBrokerPactKey(""), - PACT_VERSION: GetBrokerPactVersionKey(""), - PACT_TAG: GetBrokerTagKey(""), - VERIFICATION: GetBrokerVerificationKey(""), - PACT_LATEST: GetBrokerLatestKey(""), -} - -var store = &BKvStore{} - -func Store() *BKvStore { - return store -} - -func (s *BKvStore) StoreSize(t sstore.StoreType) int { - return 100 -} - -func (s *BKvStore) newStore(t sstore.StoreType, opts ...sstore.KvCacherCfgOption) { - opts = append(opts, - sstore.WithKey(TypeRoots[t]), - sstore.WithInitSize(s.StoreSize(t)), - ) - s.newIndexer(t, sstore.NewKvCacher(t.String(), opts...)) -} - -func (s *BKvStore) store(ctx context.Context) { - for t := sstore.StoreType(0); t != typeEnd; t++ { - s.newStore(t) - } - for _, i := range s.bindexers { - select { - case <-ctx.Done(): - return - case <-i.Ready(): - } - } - util.SafeCloseChan(s.bready) - - util.Logger().Debugf("all indexers are ready") -} +var brokerKvStore = &BKvStore{} func init() { - store.Initialize() - store.Run() - store.Ready() -} + PARTICIPANT = store.Store().MustInstall(store.NewEntity("PARTICIPANT", GetBrokerParticipantKey(""))) + VERSION = store.Store().MustInstall(store.NewEntity("VERSION", GetBrokerVersionKey(""))) + PACT = store.Store().MustInstall(store.NewEntity("PACT", GetBrokerPactKey(""))) + PACT_VERSION = store.Store().MustInstall(store.NewEntity("PACT_VERSION", GetBrokerPactVersionKey(""))) + PACT_TAG = store.Store().MustInstall(store.NewEntity("PACT_TAG", GetBrokerTagKey(""))) + VERIFICATION = store.Store().MustInstall(store.NewEntity("VERIFICATION", GetBrokerVerificationKey(""))) + PACT_LATEST = store.Store().MustInstall(store.NewEntity("PACT_LATEST", GetBrokerLatestKey(""))) -type BKvStore struct { - *sstore.KvStore - bindexers map[sstore.StoreType]*sstore.Indexer - block sync.RWMutex - bready chan struct{} - bisClose bool -} - -func (s *BKvStore) Initialize() { - s.KvStore = sstore.Store() - s.KvStore.Initialize() - s.bindexers = make(map[sstore.StoreType]*sstore.Indexer) - s.bready = make(chan struct{}) - - for i := sstore.StoreType(0); i != typeEnd; i++ { - store.newNullStore(i) - } -} - -func (s *BKvStore) newNullStore(t sstore.StoreType) { - s.newIndexer(t, sstore.NullCacher) -} - -func (s *BKvStore) newIndexer(t sstore.StoreType, cacher sstore.Cacher) { - indexer := sstore.NewCacheIndexer(cacher) - s.bindexers[t] = indexer - indexer.Run() } -func (s *BKvStore) Run() { - util.Go(func(ctx context.Context) { - s.store(ctx) - select { - case <-ctx.Done(): - s.Stop() - } - }) -} - -func (s *BKvStore) Ready() <-chan struct{} { - return s.bready +type BKvStore struct { } -func (s *BKvStore) Participant() *sstore.Indexer { - return s.bindexers[PARTICIPANT] +func (s *BKvStore) Participant() *store.Indexer { + return store.Store().Entity(PARTICIPANT) } -func (s *BKvStore) Version() *sstore.Indexer { - return s.bindexers[VERSION] +func (s *BKvStore) Version() *store.Indexer { + return store.Store().Entity(VERSION) } -func (s *BKvStore) Pact() *sstore.Indexer { - return s.bindexers[PACT] +func (s *BKvStore) Pact() *store.Indexer { + return store.Store().Entity(PACT) } -func (s *BKvStore) PactVersion() *sstore.Indexer { - return s.bindexers[PACT_VERSION] +func (s *BKvStore) PactVersion() *store.Indexer { + return store.Store().Entity(PACT_VERSION) } -func (s *BKvStore) PactTag() *sstore.Indexer { - return s.bindexers[PACT_TAG] +func (s *BKvStore) PactTag() *store.Indexer { + return store.Store().Entity(PACT_TAG) } -func (s *BKvStore) Verification() *sstore.Indexer { - return s.bindexers[VERIFICATION] +func (s *BKvStore) Verification() *store.Indexer { + return store.Store().Entity(VERIFICATION) } -func (s *BKvStore) PactLatest() *sstore.Indexer { - return s.bindexers[PACT_LATEST] +func (s *BKvStore) PactLatest() *store.Indexer { + return store.Store().Entity(PACT_LATEST) } -func (s *BKvStore) Stop() { - if s.bisClose { - return - } - s.bisClose = true - - for _, i := range s.bindexers { - i.Stop() - } - - util.SafeCloseChan(s.bready) - - util.Logger().Debugf("broker store daemon stopped") +func Store() *BKvStore { + return brokerKvStore } diff --git a/server/core/backend/store/cache_kv.go b/server/core/backend/store/cache_kv.go index fcced8f2..9fb347eb 100644 --- a/server/core/backend/store/cache_kv.go +++ b/server/core/backend/store/cache_kv.go @@ -95,7 +95,7 @@ func (c *KvCache) compact() { c.store = newCache util.Logger().Infof("cache %s is not in use over %s, compact capacity to size %d->%d", - c.owner.Cfg.Key, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size) + c.owner.Cfg.Prefix, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size) } @@ -107,7 +107,7 @@ func (c *KvCache) Size() (l int) { } type KvCacher struct { - Cfg KvCacherCfg + Cfg Config name string lastRev int64 @@ -143,13 +143,13 @@ func (c *KvCacher) needList() bool { } util.Logger().Debugf("no events come in more then %s, need to list key %s, rev: %d", - time.Duration(c.noEventCount)*c.Cfg.Timeout, c.Cfg.Key, rev) + time.Duration(c.noEventCount)*c.Cfg.Timeout, c.Cfg.Prefix, rev) c.noEventCount = 0 return true } -func (c *KvCacher) doList(listOps ListOptions) error { - kvs, err := c.lw.List(listOps) +func (c *KvCacher) doList(cfg ListWatchConfig) error { + kvs, err := c.lw.List(cfg) if err != nil { return err } @@ -161,13 +161,13 @@ func (c *KvCacher) doList(listOps ListOptions) error { util.Logger().Warnf(nil, "most of the protected data(%d/%d) are recovered", kc, c.cache.Size()) } c.sync(evts) - util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: %d", c.Cfg.Key, len(kvs), c.lw.Revision()) + util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: %d", c.Cfg.Prefix, len(kvs), c.lw.Revision()) return nil } -func (c *KvCacher) doWatch(listOps ListOptions) error { - watcher := c.lw.Watch(listOps) +func (c *KvCacher) doWatch(cfg ListWatchConfig) error { + watcher := c.lw.Watch(cfg) return c.handleWatcher(watcher) } @@ -175,19 +175,19 @@ func (c *KvCacher) ListAndWatch(ctx context.Context) error { c.mux.Lock() defer c.mux.Unlock() - listOps := ListOptions{ + cfg := ListWatchConfig{ Timeout: c.Cfg.Timeout, Context: ctx, } if c.needList() { - if err := c.doList(listOps); err != nil { + if err := c.doList(cfg); err != nil { return err } } util.SafeCloseChan(c.ready) - return c.doWatch(listOps) + return c.doWatch(cfg) } func (c *KvCacher) handleWatcher(watcher *Watcher) error { @@ -336,7 +336,7 @@ func (c *KvCacher) filterDelete(store map[string]*mvccpb.KeyValue, newStore map[ block[i] = KvEvent{ Revision: rev, Type: proto.EVT_DELETE, - Prefix: c.Cfg.Key, + Prefix: c.Cfg.Prefix, Object: v, } i++ @@ -365,7 +365,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt block[i] = KvEvent{ Revision: rev, Type: proto.EVT_CREATE, - Prefix: c.Cfg.Key, + Prefix: c.Cfg.Prefix, Object: v, } i++ @@ -385,7 +385,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt block[i] = KvEvent{ Revision: rev, Type: proto.EVT_UPDATE, - Prefix: c.Cfg.Key, + Prefix: c.Cfg.Prefix, Object: v, } i++ @@ -492,8 +492,8 @@ func NewKvCache(c *KvCacher, size int) *KvCache { } } -func NewKvCacher(name string, opts ...KvCacherCfgOption) *KvCacher { - cfg := DefaultKvCacherConfig() +func NewKvCacher(name string, opts ...ConfigOption) *KvCacher { + cfg := DefaultConfig() for _, opt := range opts { opt(&cfg) } @@ -504,7 +504,7 @@ func NewKvCacher(name string, opts ...KvCacherCfgOption) *KvCacher { ready: make(chan struct{}), lw: ListWatcher{ Client: backend.Registry(), - Key: cfg.Key, + Prefix: cfg.Prefix, }, goroutine: util.NewGo(context.Background()), } diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go index 94fbe46c..898964c8 100644 --- a/server/core/backend/store/cacher.go +++ b/server/core/backend/store/cacher.go @@ -24,6 +24,7 @@ type Cache interface { } type Cacher interface { + // Name is the cache size metric name Name() string Cache() Cache Run() diff --git a/server/core/backend/store/common.go b/server/core/backend/store/common.go index fc04898e..0de9e05c 100644 --- a/server/core/backend/store/common.go +++ b/server/core/backend/store/common.go @@ -25,12 +25,17 @@ import ( type StoreType int func (st StoreType) String() string { + if int(st) < 0 { + return "NONEXIST" + } if int(st) < len(TypeNames) { return TypeNames[st] } return "TYPE" + strconv.Itoa(int(st)) } +const NONEXIST = StoreType(-1) + const ( DOMAIN StoreType = iota PROJECT @@ -48,25 +53,25 @@ const ( INSTANCE LEASE ENDPOINTS - typeEnd + typeEnd // end of the base store types ) var TypeNames = []string{ - SERVICE: "SERVICE", - INSTANCE: "INSTANCE", DOMAIN: "DOMAIN", - SCHEMA: "SCHEMA", - SCHEMA_SUMMARY: "SCHEMA_SUMMARY", - RULE: "RULE", - LEASE: "LEASE", + PROJECT: "PROJECT", + SERVICE: "SERVICE", SERVICE_INDEX: "SERVICE_INDEX", SERVICE_ALIAS: "SERVICE_ALIAS", SERVICE_TAG: "SERVICE_TAG", + RULE: "RULE", RULE_INDEX: "RULE_INDEX", DEPENDENCY: "DEPENDENCY", DEPENDENCY_RULE: "DEPENDENCY_RULE", DEPENDENCY_QUEUE: "DEPENDENCY_QUEUE", - PROJECT: "PROJECT", + SCHEMA: "SCHEMA", + SCHEMA_SUMMARY: "SCHEMA_SUMMARY", + INSTANCE: "INSTANCE", + LEASE: "LEASE", ENDPOINTS: "ENDPOINTS", } diff --git a/server/core/backend/store/opt.go b/server/core/backend/store/config.go similarity index 55% rename from server/core/backend/store/opt.go rename to server/core/backend/store/config.go index 59fbabb0..9f0b2dde 100644 --- a/server/core/backend/store/opt.go +++ b/server/core/backend/store/config.go @@ -18,11 +18,12 @@ package store import ( "fmt" + "golang.org/x/net/context" "time" ) -type KvCacherCfg struct { - Key string +type Config struct { + Prefix string InitSize int NoEventMaxInterval int Timeout time.Duration @@ -31,43 +32,52 @@ type KvCacherCfg struct { DeferHandler DeferHandler } -func (cfg KvCacherCfg) String() string { - return fmt.Sprintf("{key: %s, timeout: %s, period: %s}", - cfg.Key, cfg.Timeout, cfg.Period) +func (cfg Config) String() string { + return fmt.Sprintf("{prefix: %s, timeout: %s, period: %s}", + cfg.Prefix, cfg.Timeout, cfg.Period) } -type KvCacherCfgOption func(*KvCacherCfg) +type ConfigOption func(*Config) -func WithKey(key string) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.Key = key } +func WithPrefix(key string) ConfigOption { + return func(cfg *Config) { cfg.Prefix = key } } -func WithInitSize(size int) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.InitSize = size } +func WithInitSize(size int) ConfigOption { + return func(cfg *Config) { cfg.InitSize = size } } -func WithTimeout(ot time.Duration) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.Timeout = ot } +func WithTimeout(ot time.Duration) ConfigOption { + return func(cfg *Config) { cfg.Timeout = ot } } -func WithPeriod(ot time.Duration) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.Period = ot } +func WithPeriod(ot time.Duration) ConfigOption { + return func(cfg *Config) { cfg.Period = ot } } -func WithEventFunc(f KvEventFunc) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.OnEvent = f } +func WithEventFunc(f KvEventFunc) ConfigOption { + return func(cfg *Config) { cfg.OnEvent = f } } -func WithDeferHandler(h DeferHandler) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.DeferHandler = h } +func WithDeferHandler(h DeferHandler) ConfigOption { + return func(cfg *Config) { cfg.DeferHandler = h } } -func DefaultKvCacherConfig() KvCacherCfg { - return KvCacherCfg{ - Key: "/", +func DefaultConfig() Config { + return Config{ + Prefix: "/", Timeout: DEFAULT_LISTWATCH_TIMEOUT, Period: time.Second, NoEventMaxInterval: DEFAULT_MAX_NO_EVENT_INTERVAL, InitSize: DEFAULT_CACHE_INIT_SIZE, } } + +type ListWatchConfig struct { + Timeout time.Duration + Context context.Context +} + +func (lo *ListWatchConfig) String() string { + return fmt.Sprintf("{timeout: %s}", lo.Timeout) +} diff --git a/server/core/backend/store/event.go b/server/core/backend/store/event.go index 504aba10..67714a04 100644 --- a/server/core/backend/store/event.go +++ b/server/core/backend/store/event.go @@ -18,22 +18,8 @@ package store import ( "github.com/apache/incubator-servicecomb-service-center/server/core/proto" - "sync" ) -var ( - evtProxies map[StoreType]*KvEventProxy -) - -func init() { - evtProxies = make(map[StoreType]*KvEventProxy, typeEnd) - for i := StoreType(0); i != typeEnd; i++ { - evtProxies[i] = &KvEventProxy{ - evtHandleFuncs: make([]KvEventFunc, 0, 5), - } - } -} - type KvEventFunc func(evt KvEvent) type KvEvent struct { @@ -48,29 +34,6 @@ type KvEventHandler interface { OnEvent(evt KvEvent) } -type KvEventProxy struct { - evtHandleFuncs []KvEventFunc - lock sync.RWMutex -} - -func (h *KvEventProxy) AddHandleFunc(f KvEventFunc) { - h.lock.Lock() - h.evtHandleFuncs = append(h.evtHandleFuncs, f) - h.lock.Unlock() -} - -func (h *KvEventProxy) OnEvent(evt KvEvent) { - h.lock.RLock() - for _, f := range h.evtHandleFuncs { - f(evt) - } - h.lock.RUnlock() -} - -func EventProxy(t StoreType) *KvEventProxy { - return evtProxies[t] -} - // the event handler/func must be good performance, or will block the event bus. func AddEventHandleFunc(t StoreType, f KvEventFunc) { EventProxy(t).AddHandleFunc(f) diff --git a/server/core/backend/store/event_proxy.go b/server/core/backend/store/event_proxy.go new file mode 100644 index 00000000..e66583d9 --- /dev/null +++ b/server/core/backend/store/event_proxy.go @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package store + +import "sync" + +var ( + EventProxies map[StoreType]*KvEventProxy +) + +func init() { + EventProxies = make(map[StoreType]*KvEventProxy, typeEnd) + for i := StoreType(0); i != typeEnd; i++ { + EventProxies[i] = NewEventProxy() + } +} + +type KvEventProxy struct { + evtHandleFuncs []KvEventFunc + lock sync.RWMutex +} + +func (h *KvEventProxy) AddHandleFunc(f KvEventFunc) { + h.lock.Lock() + h.evtHandleFuncs = append(h.evtHandleFuncs, f) + h.lock.Unlock() +} + +func (h *KvEventProxy) OnEvent(evt KvEvent) { + h.lock.RLock() + for _, f := range h.evtHandleFuncs { + f(evt) + } + h.lock.RUnlock() +} + +func NewEventProxy() *KvEventProxy { + return &KvEventProxy{ + evtHandleFuncs: make([]KvEventFunc, 0, 5), + } +} + +func EventProxy(t StoreType) *KvEventProxy { + return EventProxies[t] +} diff --git a/server/core/backend/store/extend.go b/server/core/backend/store/extend.go new file mode 100644 index 00000000..bffd0c15 --- /dev/null +++ b/server/core/backend/store/extend.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package store + +import ( + "errors" + "fmt" +) + +type Entity interface { + Name() string + Prefix() string + InitSize() int +} + +type entity struct { + name string + prefix string + initSize int +} + +func (e *entity) Name() string { + return e.name +} + +func (e *entity) Prefix() string { + return e.prefix +} + +func (e *entity) InitSize() int { + return e.initSize +} + +func InstallType(e Entity) (id StoreType, err error) { + if e == nil { + return NONEXIST, errors.New("invalid parameter") + } + for _, n := range TypeNames { + if n == e.Name() { + return NONEXIST, fmt.Errorf("redeclare store type '%s'", n) + } + } + for _, r := range TypeRoots { + if r == e.Prefix() { + return NONEXIST, fmt.Errorf("redeclare store root '%s'", r) + } + } + + TypeNames = append(TypeNames, e.Name()) + id = StoreType(len(TypeNames) + 1) // +1 for typeEnd + + TypeRoots[id] = e.Prefix() + TypeInitSize[id] = e.InitSize() + + EventProxies[id] = NewEventProxy() + return +} + +func NewEntity(name, prefix string, opts ...ConfigOption) Entity { + cfg := DefaultConfig() + for _, opt := range opts { + opt(&cfg) + } + cfg.Prefix = prefix + return &entity{ + name: name, + prefix: cfg.Prefix, + initSize: cfg.InitSize, + } +} diff --git a/server/core/backend/store/extend_test.go b/server/core/backend/store/extend_test.go new file mode 100644 index 00000000..8359dd2e --- /dev/null +++ b/server/core/backend/store/extend_test.go @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package store + +import "testing" + +type extend struct { +} + +func (e *extend) Name() string { + return "test" +} + +func (e *extend) Prefix() string { + return "/test" +} + +func (e *extend) InitSize() int { + return 0 +} + +func TestInstallType(t *testing.T) { + id, err := InstallType(&extend{}) + if err != nil { + t.Fatal(err) + } + if id == NONEXIST { + t.Fatal(err) + } + + id, err = InstallType(&extend{}) + if id != NONEXIST || err == nil { + t.Fatal("InstallType fail", err) + } +} diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go index 3f8cd7d7..2b35a976 100644 --- a/server/core/backend/store/indexer.go +++ b/server/core/backend/store/indexer.go @@ -28,23 +28,16 @@ import ( "time" ) -var defaultRootKeys map[string]struct{} - -func init() { - defaultRootKeys = make(map[string]struct{}, len(defaultRootKeys)) - for _, root := range TypeRoots { - defaultRootKeys[root] = struct{}{} - } -} - type Indexer struct { - BuildTimeout time.Duration + BuildTimeout time.Duration + Root string + cacher Cacher - prefixIndex map[string]map[string]struct{} - prefixLock sync.RWMutex - prefixBuildQueue chan KvEvent goroutine *util.GoRoutine ready chan struct{} + prefixIndex map[string]map[string]struct{} + prefixBuildQueue chan KvEvent + prefixLock sync.RWMutex isClose bool } @@ -266,8 +259,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix string) (count int) { } func (i *Indexer) addPrefixKey(prefix, key string) { - _, ok := defaultRootKeys[key] - if ok { + if i.Root == key { return } @@ -342,9 +334,10 @@ func (i *Indexer) Ready() <-chan struct{} { return i.ready } -func NewCacheIndexer(cr Cacher) *Indexer { +func NewCacheIndexer(root string, cr Cacher) *Indexer { return &Indexer{ BuildTimeout: DEFAULT_ADD_QUEUE_TIMEOUT, + Root: root, cacher: cr, prefixIndex: make(map[string]map[string]struct{}, DEFAULT_CACHE_INIT_SIZE), prefixBuildQueue: make(chan KvEvent, DEFAULT_MAX_EVENT_COUNT), diff --git a/server/core/backend/store/listwatch.go b/server/core/backend/store/listwatch.go index 3cf1fe75..42fbde52 100644 --- a/server/core/backend/store/listwatch.go +++ b/server/core/backend/store/listwatch.go @@ -24,30 +24,20 @@ import ( "github.com/coreos/etcd/mvcc/mvccpb" "golang.org/x/net/context" "sync" - "time" ) -type ListOptions struct { - Timeout time.Duration - Context context.Context -} - -func (lo *ListOptions) String() string { - return fmt.Sprintf("{timeout: %s}", lo.Timeout) -} - type ListWatcher struct { Client registry.Registry - Key string + Prefix string rev int64 } -func (lw *ListWatcher) List(op ListOptions) ([]*mvccpb.KeyValue, error) { +func (lw *ListWatcher) List(op ListWatchConfig) ([]*mvccpb.KeyValue, error) { otCtx, _ := context.WithTimeout(op.Context, op.Timeout) - resp, err := lw.Client.Do(otCtx, registry.WatchPrefixOpOptions(lw.Key)...) + resp, err := lw.Client.Do(otCtx, registry.WatchPrefixOpOptions(lw.Prefix)...) if err != nil { - util.Logger().Errorf(err, "list key %s failed, rev: %d->0", lw.Key, lw.Revision()) + util.Logger().Errorf(err, "list prefix %s failed, rev: %d->0", lw.Prefix, lw.Revision()) lw.setRevision(0) return nil, err } @@ -66,14 +56,14 @@ func (lw *ListWatcher) setRevision(rev int64) { lw.rev = rev } -func (lw *ListWatcher) Watch(op ListOptions) *Watcher { +func (lw *ListWatcher) Watch(op ListWatchConfig) *Watcher { return newWatcher(lw, op) } func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []KvEvent)) error { rev := lw.Revision() opts := append( - registry.WatchPrefixOpOptions(lw.Key), + registry.WatchPrefixOpOptions(lw.Prefix), registry.WithRev(rev+1), registry.WithWatchCallback( func(message string, resp *registry.PluginResponse) error { @@ -81,13 +71,13 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []KvEvent)) error return fmt.Errorf("unknown event %s", resp) } - util.Logger().Infof("watch prefix key %s, start rev %d+1, event: %s", lw.Key, rev, resp) + util.Logger().Infof("watch prefix %s, start rev %d+1, event: %s", lw.Prefix, rev, resp) lw.setRevision(resp.Revision) evts := make([]KvEvent, len(resp.Kvs)) for i, kv := range resp.Kvs { - evt := KvEvent{Prefix: lw.Key, Revision: kv.ModRevision} + evt := KvEvent{Prefix: lw.Prefix, Revision: kv.ModRevision} switch { case resp.Action == registry.Put && kv.Version == 1: evt.Type, evt.Object = proto.EVT_CREATE, kv @@ -106,16 +96,16 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []KvEvent)) error err := lw.Client.Watch(ctx, opts...) if err != nil { // compact可能会导致watch失败 or message body size lager than 4MB - util.Logger().Errorf(err, "watch key %s failed, start rev: %d+1->%d->0", lw.Key, rev, lw.Revision()) + util.Logger().Errorf(err, "watch prefix %s failed, start rev: %d+1->%d->0", lw.Prefix, rev, lw.Revision()) lw.setRevision(0) - f([]KvEvent{errEvent(lw.Key, err)}) + f([]KvEvent{errEvent(lw.Prefix, err)}) } return err } type Watcher struct { - ListOps ListOptions + ListOps ListWatchConfig lw *ListWatcher bus chan []KvEvent stopCh chan struct{} @@ -169,7 +159,7 @@ func errEvent(key string, err error) KvEvent { } } -func newWatcher(lw *ListWatcher, listOps ListOptions) *Watcher { +func newWatcher(lw *ListWatcher, listOps ListWatchConfig) *Watcher { w := &Watcher{ ListOps: listOps, lw: lw, diff --git a/server/core/backend/store/store.go b/server/core/backend/store/store.go index e549cd41..66849464 100644 --- a/server/core/backend/store/store.go +++ b/server/core/backend/store/store.go @@ -61,7 +61,7 @@ func (s *KvStore) dispatchEvent(t StoreType, evt KvEvent) { } func (s *KvStore) newIndexBuilder(t StoreType, cacher Cacher) { - indexer := NewCacheIndexer(cacher) + indexer := NewCacheIndexer(TypeRoots[t], cacher) s.indexers[t] = indexer indexer.Run() } @@ -71,7 +71,7 @@ func (s *KvStore) Run() { s.taskService.Run() } -func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts []KvCacherCfgOption) { +func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts []ConfigOption) { switch t { case INSTANCE: opts = append(opts, WithDeferHandler(s.SelfPreservationHandler())) @@ -81,7 +81,7 @@ func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts []KvCacherCfgOption) opts = append(opts, WithInitSize(sz)) } opts = append(opts, - WithKey(TypeRoots[t]), + WithPrefix(TypeRoots[t]), WithEventFunc(func(evt KvEvent) { s.dispatchEvent(t, evt) })) return } @@ -246,6 +246,33 @@ func (s *KvStore) KeepAlive(ctx context.Context, opts ...registry.PluginOpOption return pt.TTL, pt.Err() } +func (s *KvStore) Entity(id StoreType) *Indexer { + return s.indexers[id] +} + +func (s *KvStore) Install(e Entity) (id StoreType, err error) { + if id, err = InstallType(e); err != nil { + return + } + + util.Logger().Infof("install new store entity %d:%s->%s", id, e.Name(), e.Prefix()) + + if !core.ServerInfo.Config.EnableCache { + s.newIndexBuilder(id, NullCacher) + return + } + s.newIndexBuilder(id, NewKvCacher(id.String(), s.getKvCacherCfgOptions(id)...)) + return +} + +func (s *KvStore) MustInstall(e Entity) StoreType { + id, err := s.Install(e) + if err != nil { + panic(err) + } + return id +} + func Store() *KvStore { return store } diff --git a/server/error/error.go b/server/error/error.go index 5ee5bf12..b30356ee 100644 --- a/server/error/error.go +++ b/server/error/error.go @@ -18,8 +18,6 @@ package error import ( "encoding/json" - "fmt" - "github.com/apache/incubator-servicecomb-service-center/pkg/util" "net/http" ) @@ -104,9 +102,9 @@ func (e Error) Error() string { return e.Message + "(" + e.Detail + ")" } -func (e Error) toJson() string { +func (e Error) Marshal() []byte { bs, _ := json.Marshal(e) - return util.BytesToStringWithNoCopy(bs) + return bs } func (e Error) StatusCode() int { @@ -123,14 +121,6 @@ func (e Error) InternalError() bool { return false } -func (e Error) HttpWrite(w http.ResponseWriter) { - status := e.StatusCode() - w.Header().Add("X-Response-Status", fmt.Sprint(status)) - w.Header().Set("Content-Type", "application/json; charset=UTF-8") - w.WriteHeader(status) - fmt.Fprintln(w, e.toJson()) -} - func NewError(code int32, detail string) *Error { return &Error{ Code: code, diff --git a/server/rest/controller/rest_util.go b/server/rest/controller/rest_util.go index be626d5b..00f93f7e 100644 --- a/server/rest/controller/rest_util.go +++ b/server/rest/controller/rest_util.go @@ -25,15 +25,23 @@ import ( "net/http" ) +const ( + contentTypeJson = "application/json; charset=UTF-8" + contentTypeText = "text/plain; charset=UTF-8" +) + func WriteError(w http.ResponseWriter, code int32, detail string) { err := error.NewError(code, detail) - err.HttpWrite(w) + w.Header().Add("X-Response-Status", fmt.Sprint(err.StatusCode())) + w.Header().Set("Content-Type", contentTypeJson) + w.WriteHeader(err.StatusCode()) + fmt.Fprintln(w, util.BytesToStringWithNoCopy(err.Marshal())) } func WriteJsonObject(w http.ResponseWriter, obj interface{}) { if obj == nil { w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK)) - w.Header().Set("Content-Type", "text/plain; charset=UTF-8") + w.Header().Set("Content-Type", contentTypeText) w.WriteHeader(http.StatusOK) return } @@ -44,7 +52,7 @@ func WriteJsonObject(w http.ResponseWriter, obj interface{}) { return } w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK)) - w.Header().Set("Content-Type", "application/json; charset=UTF-8") + w.Header().Set("Content-Type", contentTypeJson) w.WriteHeader(http.StatusOK) fmt.Fprintln(w, util.BytesToStringWithNoCopy(objJson)) } @@ -61,11 +69,10 @@ func WriteResponse(w http.ResponseWriter, resp *pb.Response, obj interface{}) { func WriteBytes(w http.ResponseWriter, resp *pb.Response, json []byte) { if resp.GetCode() == pb.Response_SUCCESS { w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK)) - w.Header().Set("Content-Type", "application/json; charset=UTF-8") + w.Header().Set("Content-Type", contentTypeJson) w.WriteHeader(http.StatusOK) w.Write(json) return } WriteError(w, resp.GetCode(), resp.GetMessage()) } - diff --git a/server/rest/controller/v3/main_controller.go b/server/rest/controller/v3/main_controller.go index 5bb44100..e20e3238 100644 --- a/server/rest/controller/v3/main_controller.go +++ b/server/rest/controller/v3/main_controller.go @@ -19,13 +19,29 @@ package v3 import ( "encoding/json" "github.com/apache/incubator-servicecomb-service-center/pkg/rest" + pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" + "github.com/apache/incubator-servicecomb-service-center/server/rest/controller" "github.com/apache/incubator-servicecomb-service-center/server/rest/controller/v4" "github.com/apache/incubator-servicecomb-service-center/version" "net/http" ) +var ( + versionJsonCache []byte + versionResp *pb.Response +) + const API_VERSION = "3.0.0" +func init() { + result := v4.Result{ + VersionSet: version.Ver(), + ApiVersion: API_VERSION, + } + versionJsonCache, _ = json.Marshal(result) + versionResp = pb.CreateResponse(pb.Response_SUCCESS, "get version successfully") +} + type MainService struct { v4.MainService } @@ -38,11 +54,5 @@ func (this *MainService) URLPatterns() []rest.Route { } func (this *MainService) GetVersion(w http.ResponseWriter, r *http.Request) { - result := v4.Result{ - VersionSet: version.Ver(), - ApiVersion: API_VERSION, - } - resultJSON, _ := json.Marshal(result) - w.Header().Set("Content-Type", "application/json;charset=utf-8") - w.Write(resultJSON) + controller.WriteBytes(w, versionResp, versionJsonCache) } diff --git a/server/rest/controller/v4/main_controller.go b/server/rest/controller/v4/main_controller.go index d2af4d4e..cbf5ed58 100644 --- a/server/rest/controller/v4/main_controller.go +++ b/server/rest/controller/v4/main_controller.go @@ -28,7 +28,10 @@ import ( "net/http" ) -var resultJSON []byte +var ( + versionJsonCache []byte + versionResp *pb.Response +) const API_VERSION = "4.0.0" @@ -48,7 +51,8 @@ func init() { API_VERSION, core.ServerInfo.Config, } - resultJSON, _ = json.Marshal(result) + versionJsonCache, _ = json.Marshal(result) + versionResp = pb.CreateResponse(pb.Response_SUCCESS, "get version successfully") } func (this *MainService) URLPatterns() []rest.Route { @@ -72,6 +76,5 @@ func (this *MainService) ClusterHealth(w http.ResponseWriter, r *http.Request) { } func (this *MainService) GetVersion(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json;charset=utf-8") - w.Write(resultJSON) + controller.WriteBytes(w, versionResp, versionJsonCache) } diff --git a/server/service/schema.go b/server/service/schema.go index 3d7da42f..dd9c4d9c 100644 --- a/server/service/schema.go +++ b/server/service/schema.go @@ -122,7 +122,6 @@ func (s *MicroServiceService) GetAllSchemaInfo(ctx context.Context, in *pb.GetAl schemasList := service.Schemas if schemasList == nil || len(schemasList) == 0 { - util.Logger().Infof("service %s schemaId set is empty.", in.ServiceId) return &pb.GetAllSchemaResponse{ Response: pb.CreateResponse(pb.Response_SUCCESS, "Do not have this schema info."), Schemas: []*pb.Schema{}, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
