[
https://issues.apache.org/jira/browse/SCB-544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465573#comment-16465573
]
ASF GitHub Bot commented on SCB-544:
------------------------------------
little-cui closed pull request #339: SCB-544 Convenient store extension
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/339
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/server/broker/broker_suite_test.go
b/server/broker/broker_suite_test.go
new file mode 100644
index 00000000..8bc652e1
--- /dev/null
+++ b/server/broker/broker_suite_test.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package broker
+
+import (
+ pb
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+ _
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/quota/buildin"
+ _
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/registry/etcd"
+ _
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/tracing/buildin"
+ _
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/uuid/buildin"
+ "github.com/apache/incubator-servicecomb-service-center/server/service"
+ . "github.com/onsi/ginkgo"
+ "github.com/onsi/ginkgo/reporters"
+ . "github.com/onsi/gomega"
+ "testing"
+)
+
+var serviceResource pb.ServiceCtrlServer
+var instanceResource pb.SerivceInstanceCtrlServerEx
+var brokerResource = BrokerServiceAPI
+
+var _ = BeforeSuite(func() {
+ //init plugin
+ serviceResource, instanceResource = service.AssembleResources()
+})
+
+func TestBroker(t *testing.T) {
+ RegisterFailHandler(Fail)
+ junitReporter := reporters.NewJUnitReporter("model.junit.xml")
+ RunSpecsWithDefaultAndCustomReporters(t, "model Suite",
[]Reporter{junitReporter})
+}
diff --git a/server/broker/service.go b/server/broker/service.go
index 3ae63c61..f4ea5264 100644
--- a/server/broker/service.go
+++ b/server/broker/service.go
@@ -62,7 +62,7 @@ func (*BrokerService) GetPactsOfProvider(ctx context.Context,
PactLogger.Errorf(nil, "Get pacts of provider failed: %s\n",
resp.Response.Message)
return &GetProviderConsumerVersionPactResponse{
- Response: pb.CreateResponse(scerr.ErrInvalidParams,
err.Error()),
+ Response: resp.GetResponse(),
}, err
}
diff --git a/server/broker/service_test.go b/server/broker/service_test.go
index 516b8c34..ac910a11 100644
--- a/server/broker/service_test.go
+++ b/server/broker/service_test.go
@@ -20,7 +20,6 @@ import (
"fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
- "github.com/apache/incubator-servicecomb-service-center/server/core"
pb
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -41,8 +40,6 @@ const (
TEST_BROKER_PROVIDER_APP = "broker_group_provider"
)
-var brokerResource = BrokerServiceAPI
-var serviceResource = core.ServiceAPI
var consumerServiceId string
var providerServiceId string
diff --git a/server/broker/store.go b/server/broker/store.go
index 7ed9c027..3bfe9825 100644
--- a/server/broker/store.go
+++ b/server/broker/store.go
@@ -17,166 +17,63 @@
package broker
import (
- "sync"
-
- "github.com/apache/incubator-servicecomb-service-center/pkg/util"
- sstore
"github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
- "golang.org/x/net/context"
+
"github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
)
-const (
- PARTICIPANT sstore.StoreType = iota
- VERSION
- PACT
- PACT_VERSION
- PACT_TAG
- VERIFICATION
- PACT_LATEST
- typeEnd
+var (
+ PARTICIPANT store.StoreType
+ VERSION store.StoreType
+ PACT store.StoreType
+ PACT_VERSION store.StoreType
+ PACT_TAG store.StoreType
+ VERIFICATION store.StoreType
+ PACT_LATEST store.StoreType
)
-var TypeNames = []string{
- PARTICIPANT: "PARTICIPANT",
- VERSION: "VERSION",
- PACT: "PACT",
- PACT_VERSION: "PACT_VERSION",
- PACT_TAG: "PACT_TAG",
- VERIFICATION: "VERIFICATION",
- PACT_LATEST: "PACT_LATEST",
-}
-
-var TypeRoots = map[sstore.StoreType]string{
- PARTICIPANT: GetBrokerParticipantKey(""),
- VERSION: GetBrokerVersionKey(""),
- PACT: GetBrokerPactKey(""),
- PACT_VERSION: GetBrokerPactVersionKey(""),
- PACT_TAG: GetBrokerTagKey(""),
- VERIFICATION: GetBrokerVerificationKey(""),
- PACT_LATEST: GetBrokerLatestKey(""),
-}
-
-var store = &BKvStore{}
-
-func Store() *BKvStore {
- return store
-}
-
-func (s *BKvStore) StoreSize(t sstore.StoreType) int {
- return 100
-}
-
-func (s *BKvStore) newStore(t sstore.StoreType, opts
...sstore.KvCacherCfgOption) {
- opts = append(opts,
- sstore.WithKey(TypeRoots[t]),
- sstore.WithInitSize(s.StoreSize(t)),
- )
- s.newIndexer(t, sstore.NewKvCacher(t.String(), opts...))
-}
-
-func (s *BKvStore) store(ctx context.Context) {
- for t := sstore.StoreType(0); t != typeEnd; t++ {
- s.newStore(t)
- }
- for _, i := range s.bindexers {
- select {
- case <-ctx.Done():
- return
- case <-i.Ready():
- }
- }
- util.SafeCloseChan(s.bready)
-
- util.Logger().Debugf("all indexers are ready")
-}
+var brokerKvStore = &BKvStore{}
func init() {
- store.Initialize()
- store.Run()
- store.Ready()
-}
+ PARTICIPANT = store.Store().MustInstall(store.NewEntity("PARTICIPANT",
GetBrokerParticipantKey("")))
+ VERSION = store.Store().MustInstall(store.NewEntity("VERSION",
GetBrokerVersionKey("")))
+ PACT = store.Store().MustInstall(store.NewEntity("PACT",
GetBrokerPactKey("")))
+ PACT_VERSION =
store.Store().MustInstall(store.NewEntity("PACT_VERSION",
GetBrokerPactVersionKey("")))
+ PACT_TAG = store.Store().MustInstall(store.NewEntity("PACT_TAG",
GetBrokerTagKey("")))
+ VERIFICATION =
store.Store().MustInstall(store.NewEntity("VERIFICATION",
GetBrokerVerificationKey("")))
+ PACT_LATEST = store.Store().MustInstall(store.NewEntity("PACT_LATEST",
GetBrokerLatestKey("")))
-type BKvStore struct {
- *sstore.KvStore
- bindexers map[sstore.StoreType]*sstore.Indexer
- block sync.RWMutex
- bready chan struct{}
- bisClose bool
-}
-
-func (s *BKvStore) Initialize() {
- s.KvStore = sstore.Store()
- s.KvStore.Initialize()
- s.bindexers = make(map[sstore.StoreType]*sstore.Indexer)
- s.bready = make(chan struct{})
-
- for i := sstore.StoreType(0); i != typeEnd; i++ {
- store.newNullStore(i)
- }
-}
-
-func (s *BKvStore) newNullStore(t sstore.StoreType) {
- s.newIndexer(t, sstore.NullCacher)
-}
-
-func (s *BKvStore) newIndexer(t sstore.StoreType, cacher sstore.Cacher) {
- indexer := sstore.NewCacheIndexer(cacher)
- s.bindexers[t] = indexer
- indexer.Run()
}
-func (s *BKvStore) Run() {
- util.Go(func(ctx context.Context) {
- s.store(ctx)
- select {
- case <-ctx.Done():
- s.Stop()
- }
- })
-}
-
-func (s *BKvStore) Ready() <-chan struct{} {
- return s.bready
+type BKvStore struct {
}
-func (s *BKvStore) Participant() *sstore.Indexer {
- return s.bindexers[PARTICIPANT]
+func (s *BKvStore) Participant() *store.Indexer {
+ return store.Store().Entity(PARTICIPANT)
}
-func (s *BKvStore) Version() *sstore.Indexer {
- return s.bindexers[VERSION]
+func (s *BKvStore) Version() *store.Indexer {
+ return store.Store().Entity(VERSION)
}
-func (s *BKvStore) Pact() *sstore.Indexer {
- return s.bindexers[PACT]
+func (s *BKvStore) Pact() *store.Indexer {
+ return store.Store().Entity(PACT)
}
-func (s *BKvStore) PactVersion() *sstore.Indexer {
- return s.bindexers[PACT_VERSION]
+func (s *BKvStore) PactVersion() *store.Indexer {
+ return store.Store().Entity(PACT_VERSION)
}
-func (s *BKvStore) PactTag() *sstore.Indexer {
- return s.bindexers[PACT_TAG]
+func (s *BKvStore) PactTag() *store.Indexer {
+ return store.Store().Entity(PACT_TAG)
}
-func (s *BKvStore) Verification() *sstore.Indexer {
- return s.bindexers[VERIFICATION]
+func (s *BKvStore) Verification() *store.Indexer {
+ return store.Store().Entity(VERIFICATION)
}
-func (s *BKvStore) PactLatest() *sstore.Indexer {
- return s.bindexers[PACT_LATEST]
+func (s *BKvStore) PactLatest() *store.Indexer {
+ return store.Store().Entity(PACT_LATEST)
}
-func (s *BKvStore) Stop() {
- if s.bisClose {
- return
- }
- s.bisClose = true
-
- for _, i := range s.bindexers {
- i.Stop()
- }
-
- util.SafeCloseChan(s.bready)
-
- util.Logger().Debugf("broker store daemon stopped")
+func Store() *BKvStore {
+ return brokerKvStore
}
diff --git a/server/core/backend/store/cache_kv.go
b/server/core/backend/store/cache_kv.go
index fcced8f2..9fb347eb 100644
--- a/server/core/backend/store/cache_kv.go
+++ b/server/core/backend/store/cache_kv.go
@@ -95,7 +95,7 @@ func (c *KvCache) compact() {
c.store = newCache
util.Logger().Infof("cache %s is not in use over %s, compact capacity
to size %d->%d",
- c.owner.Cfg.Key, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size)
+ c.owner.Cfg.Prefix, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize,
c.size)
}
@@ -107,7 +107,7 @@ func (c *KvCache) Size() (l int) {
}
type KvCacher struct {
- Cfg KvCacherCfg
+ Cfg Config
name string
lastRev int64
@@ -143,13 +143,13 @@ func (c *KvCacher) needList() bool {
}
util.Logger().Debugf("no events come in more then %s, need to list key
%s, rev: %d",
- time.Duration(c.noEventCount)*c.Cfg.Timeout, c.Cfg.Key, rev)
+ time.Duration(c.noEventCount)*c.Cfg.Timeout, c.Cfg.Prefix, rev)
c.noEventCount = 0
return true
}
-func (c *KvCacher) doList(listOps ListOptions) error {
- kvs, err := c.lw.List(listOps)
+func (c *KvCacher) doList(cfg ListWatchConfig) error {
+ kvs, err := c.lw.List(cfg)
if err != nil {
return err
}
@@ -161,13 +161,13 @@ func (c *KvCacher) doList(listOps ListOptions) error {
util.Logger().Warnf(nil, "most of the protected data(%d/%d) are
recovered", kc, c.cache.Size())
}
c.sync(evts)
- util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev:
%d", c.Cfg.Key, len(kvs), c.lw.Revision())
+ util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev:
%d", c.Cfg.Prefix, len(kvs), c.lw.Revision())
return nil
}
-func (c *KvCacher) doWatch(listOps ListOptions) error {
- watcher := c.lw.Watch(listOps)
+func (c *KvCacher) doWatch(cfg ListWatchConfig) error {
+ watcher := c.lw.Watch(cfg)
return c.handleWatcher(watcher)
}
@@ -175,19 +175,19 @@ func (c *KvCacher) ListAndWatch(ctx context.Context)
error {
c.mux.Lock()
defer c.mux.Unlock()
- listOps := ListOptions{
+ cfg := ListWatchConfig{
Timeout: c.Cfg.Timeout,
Context: ctx,
}
if c.needList() {
- if err := c.doList(listOps); err != nil {
+ if err := c.doList(cfg); err != nil {
return err
}
}
util.SafeCloseChan(c.ready)
- return c.doWatch(listOps)
+ return c.doWatch(cfg)
}
func (c *KvCacher) handleWatcher(watcher *Watcher) error {
@@ -336,7 +336,7 @@ func (c *KvCacher) filterDelete(store
map[string]*mvccpb.KeyValue, newStore map[
block[i] = KvEvent{
Revision: rev,
Type: proto.EVT_DELETE,
- Prefix: c.Cfg.Key,
+ Prefix: c.Cfg.Prefix,
Object: v,
}
i++
@@ -365,7 +365,7 @@ func (c *KvCacher) filterCreateOrUpdate(store
map[string]*mvccpb.KeyValue, newSt
block[i] = KvEvent{
Revision: rev,
Type: proto.EVT_CREATE,
- Prefix: c.Cfg.Key,
+ Prefix: c.Cfg.Prefix,
Object: v,
}
i++
@@ -385,7 +385,7 @@ func (c *KvCacher) filterCreateOrUpdate(store
map[string]*mvccpb.KeyValue, newSt
block[i] = KvEvent{
Revision: rev,
Type: proto.EVT_UPDATE,
- Prefix: c.Cfg.Key,
+ Prefix: c.Cfg.Prefix,
Object: v,
}
i++
@@ -492,8 +492,8 @@ func NewKvCache(c *KvCacher, size int) *KvCache {
}
}
-func NewKvCacher(name string, opts ...KvCacherCfgOption) *KvCacher {
- cfg := DefaultKvCacherConfig()
+func NewKvCacher(name string, opts ...ConfigOption) *KvCacher {
+ cfg := DefaultConfig()
for _, opt := range opts {
opt(&cfg)
}
@@ -504,7 +504,7 @@ func NewKvCacher(name string, opts ...KvCacherCfgOption)
*KvCacher {
ready: make(chan struct{}),
lw: ListWatcher{
Client: backend.Registry(),
- Key: cfg.Key,
+ Prefix: cfg.Prefix,
},
goroutine: util.NewGo(context.Background()),
}
diff --git a/server/core/backend/store/cacher.go
b/server/core/backend/store/cacher.go
index 94fbe46c..898964c8 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -24,6 +24,7 @@ type Cache interface {
}
type Cacher interface {
+ // Name is the cache size metric name
Name() string
Cache() Cache
Run()
diff --git a/server/core/backend/store/common.go
b/server/core/backend/store/common.go
index fc04898e..0de9e05c 100644
--- a/server/core/backend/store/common.go
+++ b/server/core/backend/store/common.go
@@ -25,12 +25,17 @@ import (
type StoreType int
func (st StoreType) String() string {
+ if int(st) < 0 {
+ return "NONEXIST"
+ }
if int(st) < len(TypeNames) {
return TypeNames[st]
}
return "TYPE" + strconv.Itoa(int(st))
}
+const NONEXIST = StoreType(-1)
+
const (
DOMAIN StoreType = iota
PROJECT
@@ -48,25 +53,25 @@ const (
INSTANCE
LEASE
ENDPOINTS
- typeEnd
+ typeEnd // end of the base store types
)
var TypeNames = []string{
- SERVICE: "SERVICE",
- INSTANCE: "INSTANCE",
DOMAIN: "DOMAIN",
- SCHEMA: "SCHEMA",
- SCHEMA_SUMMARY: "SCHEMA_SUMMARY",
- RULE: "RULE",
- LEASE: "LEASE",
+ PROJECT: "PROJECT",
+ SERVICE: "SERVICE",
SERVICE_INDEX: "SERVICE_INDEX",
SERVICE_ALIAS: "SERVICE_ALIAS",
SERVICE_TAG: "SERVICE_TAG",
+ RULE: "RULE",
RULE_INDEX: "RULE_INDEX",
DEPENDENCY: "DEPENDENCY",
DEPENDENCY_RULE: "DEPENDENCY_RULE",
DEPENDENCY_QUEUE: "DEPENDENCY_QUEUE",
- PROJECT: "PROJECT",
+ SCHEMA: "SCHEMA",
+ SCHEMA_SUMMARY: "SCHEMA_SUMMARY",
+ INSTANCE: "INSTANCE",
+ LEASE: "LEASE",
ENDPOINTS: "ENDPOINTS",
}
diff --git a/server/core/backend/store/opt.go
b/server/core/backend/store/config.go
similarity index 55%
rename from server/core/backend/store/opt.go
rename to server/core/backend/store/config.go
index 59fbabb0..9f0b2dde 100644
--- a/server/core/backend/store/opt.go
+++ b/server/core/backend/store/config.go
@@ -18,11 +18,12 @@ package store
import (
"fmt"
+ "golang.org/x/net/context"
"time"
)
-type KvCacherCfg struct {
- Key string
+type Config struct {
+ Prefix string
InitSize int
NoEventMaxInterval int
Timeout time.Duration
@@ -31,43 +32,52 @@ type KvCacherCfg struct {
DeferHandler DeferHandler
}
-func (cfg KvCacherCfg) String() string {
- return fmt.Sprintf("{key: %s, timeout: %s, period: %s}",
- cfg.Key, cfg.Timeout, cfg.Period)
+func (cfg Config) String() string {
+ return fmt.Sprintf("{prefix: %s, timeout: %s, period: %s}",
+ cfg.Prefix, cfg.Timeout, cfg.Period)
}
-type KvCacherCfgOption func(*KvCacherCfg)
+type ConfigOption func(*Config)
-func WithKey(key string) KvCacherCfgOption {
- return func(cfg *KvCacherCfg) { cfg.Key = key }
+func WithPrefix(key string) ConfigOption {
+ return func(cfg *Config) { cfg.Prefix = key }
}
-func WithInitSize(size int) KvCacherCfgOption {
- return func(cfg *KvCacherCfg) { cfg.InitSize = size }
+func WithInitSize(size int) ConfigOption {
+ return func(cfg *Config) { cfg.InitSize = size }
}
-func WithTimeout(ot time.Duration) KvCacherCfgOption {
- return func(cfg *KvCacherCfg) { cfg.Timeout = ot }
+func WithTimeout(ot time.Duration) ConfigOption {
+ return func(cfg *Config) { cfg.Timeout = ot }
}
-func WithPeriod(ot time.Duration) KvCacherCfgOption {
- return func(cfg *KvCacherCfg) { cfg.Period = ot }
+func WithPeriod(ot time.Duration) ConfigOption {
+ return func(cfg *Config) { cfg.Period = ot }
}
-func WithEventFunc(f KvEventFunc) KvCacherCfgOption {
- return func(cfg *KvCacherCfg) { cfg.OnEvent = f }
+func WithEventFunc(f KvEventFunc) ConfigOption {
+ return func(cfg *Config) { cfg.OnEvent = f }
}
-func WithDeferHandler(h DeferHandler) KvCacherCfgOption {
- return func(cfg *KvCacherCfg) { cfg.DeferHandler = h }
+func WithDeferHandler(h DeferHandler) ConfigOption {
+ return func(cfg *Config) { cfg.DeferHandler = h }
}
-func DefaultKvCacherConfig() KvCacherCfg {
- return KvCacherCfg{
- Key: "/",
+func DefaultConfig() Config {
+ return Config{
+ Prefix: "/",
Timeout: DEFAULT_LISTWATCH_TIMEOUT,
Period: time.Second,
NoEventMaxInterval: DEFAULT_MAX_NO_EVENT_INTERVAL,
InitSize: DEFAULT_CACHE_INIT_SIZE,
}
}
+
+type ListWatchConfig struct {
+ Timeout time.Duration
+ Context context.Context
+}
+
+func (lo *ListWatchConfig) String() string {
+ return fmt.Sprintf("{timeout: %s}", lo.Timeout)
+}
diff --git a/server/core/backend/store/event.go
b/server/core/backend/store/event.go
index 504aba10..67714a04 100644
--- a/server/core/backend/store/event.go
+++ b/server/core/backend/store/event.go
@@ -18,22 +18,8 @@ package store
import (
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
- "sync"
)
-var (
- evtProxies map[StoreType]*KvEventProxy
-)
-
-func init() {
- evtProxies = make(map[StoreType]*KvEventProxy, typeEnd)
- for i := StoreType(0); i != typeEnd; i++ {
- evtProxies[i] = &KvEventProxy{
- evtHandleFuncs: make([]KvEventFunc, 0, 5),
- }
- }
-}
-
type KvEventFunc func(evt KvEvent)
type KvEvent struct {
@@ -48,29 +34,6 @@ type KvEventHandler interface {
OnEvent(evt KvEvent)
}
-type KvEventProxy struct {
- evtHandleFuncs []KvEventFunc
- lock sync.RWMutex
-}
-
-func (h *KvEventProxy) AddHandleFunc(f KvEventFunc) {
- h.lock.Lock()
- h.evtHandleFuncs = append(h.evtHandleFuncs, f)
- h.lock.Unlock()
-}
-
-func (h *KvEventProxy) OnEvent(evt KvEvent) {
- h.lock.RLock()
- for _, f := range h.evtHandleFuncs {
- f(evt)
- }
- h.lock.RUnlock()
-}
-
-func EventProxy(t StoreType) *KvEventProxy {
- return evtProxies[t]
-}
-
// the event handler/func must be good performance, or will block the event
bus.
func AddEventHandleFunc(t StoreType, f KvEventFunc) {
EventProxy(t).AddHandleFunc(f)
diff --git a/server/core/backend/store/event_proxy.go
b/server/core/backend/store/event_proxy.go
new file mode 100644
index 00000000..e66583d9
--- /dev/null
+++ b/server/core/backend/store/event_proxy.go
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import "sync"
+
+var (
+ EventProxies map[StoreType]*KvEventProxy
+)
+
+func init() {
+ EventProxies = make(map[StoreType]*KvEventProxy, typeEnd)
+ for i := StoreType(0); i != typeEnd; i++ {
+ EventProxies[i] = NewEventProxy()
+ }
+}
+
+type KvEventProxy struct {
+ evtHandleFuncs []KvEventFunc
+ lock sync.RWMutex
+}
+
+func (h *KvEventProxy) AddHandleFunc(f KvEventFunc) {
+ h.lock.Lock()
+ h.evtHandleFuncs = append(h.evtHandleFuncs, f)
+ h.lock.Unlock()
+}
+
+func (h *KvEventProxy) OnEvent(evt KvEvent) {
+ h.lock.RLock()
+ for _, f := range h.evtHandleFuncs {
+ f(evt)
+ }
+ h.lock.RUnlock()
+}
+
+func NewEventProxy() *KvEventProxy {
+ return &KvEventProxy{
+ evtHandleFuncs: make([]KvEventFunc, 0, 5),
+ }
+}
+
+func EventProxy(t StoreType) *KvEventProxy {
+ return EventProxies[t]
+}
diff --git a/server/core/backend/store/extend.go
b/server/core/backend/store/extend.go
new file mode 100644
index 00000000..bffd0c15
--- /dev/null
+++ b/server/core/backend/store/extend.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import (
+ "errors"
+ "fmt"
+)
+
+type Entity interface {
+ Name() string
+ Prefix() string
+ InitSize() int
+}
+
+type entity struct {
+ name string
+ prefix string
+ initSize int
+}
+
+func (e *entity) Name() string {
+ return e.name
+}
+
+func (e *entity) Prefix() string {
+ return e.prefix
+}
+
+func (e *entity) InitSize() int {
+ return e.initSize
+}
+
+func InstallType(e Entity) (id StoreType, err error) {
+ if e == nil {
+ return NONEXIST, errors.New("invalid parameter")
+ }
+ for _, n := range TypeNames {
+ if n == e.Name() {
+ return NONEXIST, fmt.Errorf("redeclare store type
'%s'", n)
+ }
+ }
+ for _, r := range TypeRoots {
+ if r == e.Prefix() {
+ return NONEXIST, fmt.Errorf("redeclare store root
'%s'", r)
+ }
+ }
+
+ TypeNames = append(TypeNames, e.Name())
+ id = StoreType(len(TypeNames) + 1) // +1 for typeEnd
+
+ TypeRoots[id] = e.Prefix()
+ TypeInitSize[id] = e.InitSize()
+
+ EventProxies[id] = NewEventProxy()
+ return
+}
+
+func NewEntity(name, prefix string, opts ...ConfigOption) Entity {
+ cfg := DefaultConfig()
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+ cfg.Prefix = prefix
+ return &entity{
+ name: name,
+ prefix: cfg.Prefix,
+ initSize: cfg.InitSize,
+ }
+}
diff --git a/server/core/backend/store/extend_test.go
b/server/core/backend/store/extend_test.go
new file mode 100644
index 00000000..8359dd2e
--- /dev/null
+++ b/server/core/backend/store/extend_test.go
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import "testing"
+
+type extend struct {
+}
+
+func (e *extend) Name() string {
+ return "test"
+}
+
+func (e *extend) Prefix() string {
+ return "/test"
+}
+
+func (e *extend) InitSize() int {
+ return 0
+}
+
+func TestInstallType(t *testing.T) {
+ id, err := InstallType(&extend{})
+ if err != nil {
+ t.Fatal(err)
+ }
+ if id == NONEXIST {
+ t.Fatal(err)
+ }
+
+ id, err = InstallType(&extend{})
+ if id != NONEXIST || err == nil {
+ t.Fatal("InstallType fail", err)
+ }
+}
diff --git a/server/core/backend/store/indexer.go
b/server/core/backend/store/indexer.go
index 3f8cd7d7..2b35a976 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -28,23 +28,16 @@ import (
"time"
)
-var defaultRootKeys map[string]struct{}
-
-func init() {
- defaultRootKeys = make(map[string]struct{}, len(defaultRootKeys))
- for _, root := range TypeRoots {
- defaultRootKeys[root] = struct{}{}
- }
-}
-
type Indexer struct {
- BuildTimeout time.Duration
+ BuildTimeout time.Duration
+ Root string
+
cacher Cacher
- prefixIndex map[string]map[string]struct{}
- prefixLock sync.RWMutex
- prefixBuildQueue chan KvEvent
goroutine *util.GoRoutine
ready chan struct{}
+ prefixIndex map[string]map[string]struct{}
+ prefixBuildQueue chan KvEvent
+ prefixLock sync.RWMutex
isClose bool
}
@@ -266,8 +259,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix
string) (count int) {
}
func (i *Indexer) addPrefixKey(prefix, key string) {
- _, ok := defaultRootKeys[key]
- if ok {
+ if i.Root == key {
return
}
@@ -342,9 +334,10 @@ func (i *Indexer) Ready() <-chan struct{} {
return i.ready
}
-func NewCacheIndexer(cr Cacher) *Indexer {
+func NewCacheIndexer(root string, cr Cacher) *Indexer {
return &Indexer{
BuildTimeout: DEFAULT_ADD_QUEUE_TIMEOUT,
+ Root: root,
cacher: cr,
prefixIndex: make(map[string]map[string]struct{},
DEFAULT_CACHE_INIT_SIZE),
prefixBuildQueue: make(chan KvEvent, DEFAULT_MAX_EVENT_COUNT),
diff --git a/server/core/backend/store/listwatch.go
b/server/core/backend/store/listwatch.go
index 3cf1fe75..42fbde52 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -24,30 +24,20 @@ import (
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
"sync"
- "time"
)
-type ListOptions struct {
- Timeout time.Duration
- Context context.Context
-}
-
-func (lo *ListOptions) String() string {
- return fmt.Sprintf("{timeout: %s}", lo.Timeout)
-}
-
type ListWatcher struct {
Client registry.Registry
- Key string
+ Prefix string
rev int64
}
-func (lw *ListWatcher) List(op ListOptions) ([]*mvccpb.KeyValue, error) {
+func (lw *ListWatcher) List(op ListWatchConfig) ([]*mvccpb.KeyValue, error) {
otCtx, _ := context.WithTimeout(op.Context, op.Timeout)
- resp, err := lw.Client.Do(otCtx,
registry.WatchPrefixOpOptions(lw.Key)...)
+ resp, err := lw.Client.Do(otCtx,
registry.WatchPrefixOpOptions(lw.Prefix)...)
if err != nil {
- util.Logger().Errorf(err, "list key %s failed, rev: %d->0",
lw.Key, lw.Revision())
+ util.Logger().Errorf(err, "list prefix %s failed, rev: %d->0",
lw.Prefix, lw.Revision())
lw.setRevision(0)
return nil, err
}
@@ -66,14 +56,14 @@ func (lw *ListWatcher) setRevision(rev int64) {
lw.rev = rev
}
-func (lw *ListWatcher) Watch(op ListOptions) *Watcher {
+func (lw *ListWatcher) Watch(op ListWatchConfig) *Watcher {
return newWatcher(lw, op)
}
func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []KvEvent))
error {
rev := lw.Revision()
opts := append(
- registry.WatchPrefixOpOptions(lw.Key),
+ registry.WatchPrefixOpOptions(lw.Prefix),
registry.WithRev(rev+1),
registry.WithWatchCallback(
func(message string, resp *registry.PluginResponse)
error {
@@ -81,13 +71,13 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f
func(evt []KvEvent)) error
return fmt.Errorf("unknown event %s",
resp)
}
- util.Logger().Infof("watch prefix key %s, start
rev %d+1, event: %s", lw.Key, rev, resp)
+ util.Logger().Infof("watch prefix %s, start rev
%d+1, event: %s", lw.Prefix, rev, resp)
lw.setRevision(resp.Revision)
evts := make([]KvEvent, len(resp.Kvs))
for i, kv := range resp.Kvs {
- evt := KvEvent{Prefix: lw.Key,
Revision: kv.ModRevision}
+ evt := KvEvent{Prefix: lw.Prefix,
Revision: kv.ModRevision}
switch {
case resp.Action == registry.Put &&
kv.Version == 1:
evt.Type, evt.Object =
proto.EVT_CREATE, kv
@@ -106,16 +96,16 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f
func(evt []KvEvent)) error
err := lw.Client.Watch(ctx, opts...)
if err != nil { // compact可能会导致watch失败 or message body size lager than
4MB
- util.Logger().Errorf(err, "watch key %s failed, start rev:
%d+1->%d->0", lw.Key, rev, lw.Revision())
+ util.Logger().Errorf(err, "watch prefix %s failed, start rev:
%d+1->%d->0", lw.Prefix, rev, lw.Revision())
lw.setRevision(0)
- f([]KvEvent{errEvent(lw.Key, err)})
+ f([]KvEvent{errEvent(lw.Prefix, err)})
}
return err
}
type Watcher struct {
- ListOps ListOptions
+ ListOps ListWatchConfig
lw *ListWatcher
bus chan []KvEvent
stopCh chan struct{}
@@ -169,7 +159,7 @@ func errEvent(key string, err error) KvEvent {
}
}
-func newWatcher(lw *ListWatcher, listOps ListOptions) *Watcher {
+func newWatcher(lw *ListWatcher, listOps ListWatchConfig) *Watcher {
w := &Watcher{
ListOps: listOps,
lw: lw,
diff --git a/server/core/backend/store/store.go
b/server/core/backend/store/store.go
index e549cd41..66849464 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -61,7 +61,7 @@ func (s *KvStore) dispatchEvent(t StoreType, evt KvEvent) {
}
func (s *KvStore) newIndexBuilder(t StoreType, cacher Cacher) {
- indexer := NewCacheIndexer(cacher)
+ indexer := NewCacheIndexer(TypeRoots[t], cacher)
s.indexers[t] = indexer
indexer.Run()
}
@@ -71,7 +71,7 @@ func (s *KvStore) Run() {
s.taskService.Run()
}
-func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts
[]KvCacherCfgOption) {
+func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts []ConfigOption) {
switch t {
case INSTANCE:
opts = append(opts,
WithDeferHandler(s.SelfPreservationHandler()))
@@ -81,7 +81,7 @@ func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts
[]KvCacherCfgOption)
opts = append(opts, WithInitSize(sz))
}
opts = append(opts,
- WithKey(TypeRoots[t]),
+ WithPrefix(TypeRoots[t]),
WithEventFunc(func(evt KvEvent) { s.dispatchEvent(t, evt) }))
return
}
@@ -246,6 +246,33 @@ func (s *KvStore) KeepAlive(ctx context.Context, opts
...registry.PluginOpOption
return pt.TTL, pt.Err()
}
+func (s *KvStore) Entity(id StoreType) *Indexer {
+ return s.indexers[id]
+}
+
+func (s *KvStore) Install(e Entity) (id StoreType, err error) {
+ if id, err = InstallType(e); err != nil {
+ return
+ }
+
+ util.Logger().Infof("install new store entity %d:%s->%s", id, e.Name(),
e.Prefix())
+
+ if !core.ServerInfo.Config.EnableCache {
+ s.newIndexBuilder(id, NullCacher)
+ return
+ }
+ s.newIndexBuilder(id, NewKvCacher(id.String(),
s.getKvCacherCfgOptions(id)...))
+ return
+}
+
+func (s *KvStore) MustInstall(e Entity) StoreType {
+ id, err := s.Install(e)
+ if err != nil {
+ panic(err)
+ }
+ return id
+}
+
func Store() *KvStore {
return store
}
diff --git a/server/error/error.go b/server/error/error.go
index 5ee5bf12..b30356ee 100644
--- a/server/error/error.go
+++ b/server/error/error.go
@@ -18,8 +18,6 @@ package error
import (
"encoding/json"
- "fmt"
- "github.com/apache/incubator-servicecomb-service-center/pkg/util"
"net/http"
)
@@ -104,9 +102,9 @@ func (e Error) Error() string {
return e.Message + "(" + e.Detail + ")"
}
-func (e Error) toJson() string {
+func (e Error) Marshal() []byte {
bs, _ := json.Marshal(e)
- return util.BytesToStringWithNoCopy(bs)
+ return bs
}
func (e Error) StatusCode() int {
@@ -123,14 +121,6 @@ func (e Error) InternalError() bool {
return false
}
-func (e Error) HttpWrite(w http.ResponseWriter) {
- status := e.StatusCode()
- w.Header().Add("X-Response-Status", fmt.Sprint(status))
- w.Header().Set("Content-Type", "application/json; charset=UTF-8")
- w.WriteHeader(status)
- fmt.Fprintln(w, e.toJson())
-}
-
func NewError(code int32, detail string) *Error {
return &Error{
Code: code,
diff --git a/server/rest/controller/rest_util.go
b/server/rest/controller/rest_util.go
index be626d5b..00f93f7e 100644
--- a/server/rest/controller/rest_util.go
+++ b/server/rest/controller/rest_util.go
@@ -25,15 +25,23 @@ import (
"net/http"
)
+const (
+ contentTypeJson = "application/json; charset=UTF-8"
+ contentTypeText = "text/plain; charset=UTF-8"
+)
+
func WriteError(w http.ResponseWriter, code int32, detail string) {
err := error.NewError(code, detail)
- err.HttpWrite(w)
+ w.Header().Add("X-Response-Status", fmt.Sprint(err.StatusCode()))
+ w.Header().Set("Content-Type", contentTypeJson)
+ w.WriteHeader(err.StatusCode())
+ fmt.Fprintln(w, util.BytesToStringWithNoCopy(err.Marshal()))
}
func WriteJsonObject(w http.ResponseWriter, obj interface{}) {
if obj == nil {
w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK))
- w.Header().Set("Content-Type", "text/plain; charset=UTF-8")
+ w.Header().Set("Content-Type", contentTypeText)
w.WriteHeader(http.StatusOK)
return
}
@@ -44,7 +52,7 @@ func WriteJsonObject(w http.ResponseWriter, obj interface{}) {
return
}
w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK))
- w.Header().Set("Content-Type", "application/json; charset=UTF-8")
+ w.Header().Set("Content-Type", contentTypeJson)
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, util.BytesToStringWithNoCopy(objJson))
}
@@ -61,11 +69,10 @@ func WriteResponse(w http.ResponseWriter, resp
*pb.Response, obj interface{}) {
func WriteBytes(w http.ResponseWriter, resp *pb.Response, json []byte) {
if resp.GetCode() == pb.Response_SUCCESS {
w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK))
- w.Header().Set("Content-Type", "application/json;
charset=UTF-8")
+ w.Header().Set("Content-Type", contentTypeJson)
w.WriteHeader(http.StatusOK)
w.Write(json)
return
}
WriteError(w, resp.GetCode(), resp.GetMessage())
}
-
diff --git a/server/rest/controller/v3/main_controller.go
b/server/rest/controller/v3/main_controller.go
index 5bb44100..e20e3238 100644
--- a/server/rest/controller/v3/main_controller.go
+++ b/server/rest/controller/v3/main_controller.go
@@ -19,13 +19,29 @@ package v3
import (
"encoding/json"
"github.com/apache/incubator-servicecomb-service-center/pkg/rest"
+ pb
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+
"github.com/apache/incubator-servicecomb-service-center/server/rest/controller"
"github.com/apache/incubator-servicecomb-service-center/server/rest/controller/v4"
"github.com/apache/incubator-servicecomb-service-center/version"
"net/http"
)
+var (
+ versionJsonCache []byte
+ versionResp *pb.Response
+)
+
const API_VERSION = "3.0.0"
+func init() {
+ result := v4.Result{
+ VersionSet: version.Ver(),
+ ApiVersion: API_VERSION,
+ }
+ versionJsonCache, _ = json.Marshal(result)
+ versionResp = pb.CreateResponse(pb.Response_SUCCESS, "get version
successfully")
+}
+
type MainService struct {
v4.MainService
}
@@ -38,11 +54,5 @@ func (this *MainService) URLPatterns() []rest.Route {
}
func (this *MainService) GetVersion(w http.ResponseWriter, r *http.Request) {
- result := v4.Result{
- VersionSet: version.Ver(),
- ApiVersion: API_VERSION,
- }
- resultJSON, _ := json.Marshal(result)
- w.Header().Set("Content-Type", "application/json;charset=utf-8")
- w.Write(resultJSON)
+ controller.WriteBytes(w, versionResp, versionJsonCache)
}
diff --git a/server/rest/controller/v4/main_controller.go
b/server/rest/controller/v4/main_controller.go
index d2af4d4e..cbf5ed58 100644
--- a/server/rest/controller/v4/main_controller.go
+++ b/server/rest/controller/v4/main_controller.go
@@ -28,7 +28,10 @@ import (
"net/http"
)
-var resultJSON []byte
+var (
+ versionJsonCache []byte
+ versionResp *pb.Response
+)
const API_VERSION = "4.0.0"
@@ -48,7 +51,8 @@ func init() {
API_VERSION,
core.ServerInfo.Config,
}
- resultJSON, _ = json.Marshal(result)
+ versionJsonCache, _ = json.Marshal(result)
+ versionResp = pb.CreateResponse(pb.Response_SUCCESS, "get version
successfully")
}
func (this *MainService) URLPatterns() []rest.Route {
@@ -72,6 +76,5 @@ func (this *MainService) ClusterHealth(w http.ResponseWriter,
r *http.Request) {
}
func (this *MainService) GetVersion(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "application/json;charset=utf-8")
- w.Write(resultJSON)
+ controller.WriteBytes(w, versionResp, versionJsonCache)
}
diff --git a/server/service/schema.go b/server/service/schema.go
index 3d7da42f..dd9c4d9c 100644
--- a/server/service/schema.go
+++ b/server/service/schema.go
@@ -122,7 +122,6 @@ func (s *MicroServiceService) GetAllSchemaInfo(ctx
context.Context, in *pb.GetAl
schemasList := service.Schemas
if schemasList == nil || len(schemasList) == 0 {
- util.Logger().Infof("service %s schemaId set is empty.",
in.ServiceId)
return &pb.GetAllSchemaResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Do
not have this schema info."),
Schemas: []*pb.Schema{},
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Convenient store extension
> --------------------------
>
> Key: SCB-544
> URL: https://issues.apache.org/jira/browse/SCB-544
> Project: Apache ServiceComb
> Issue Type: New Feature
> Components: Service-Center
> Reporter: little-cui
> Assignee: little-cui
> Priority: Major
> Fix For: service-center-1.0.0-m2
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)