This is an automated email from the ASF dual-hosted git repository. asifdxtreme pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new 67e0f0b SCB-680 Optimize find instance api (#376) 67e0f0b is described below commit 67e0f0bd5b71e5d8034dc3ca84a13f88ba8692df Author: little-cui <sure_0...@qq.com> AuthorDate: Mon Jun 25 12:44:12 2018 +0800 SCB-680 Optimize find instance api (#376) * SCB-680 Optimize find instance api --- server/service/dependency.go | 2 - server/service/event/instance_event_handler.go | 19 +++++- server/service/event/rule_event_handler.go | 2 +- server/service/event/tag_event_handler.go | 2 +- server/service/instance.go | 26 +++++++- server/service/{util => notification}/common.go | 41 +++++++++++-- server/service/notification/listwatcher.go | 25 +++++--- .../{util/common.go => notification/notice.go} | 32 +++++++--- server/service/notification/stream.go | 54 ++++++++++++++++ .../notification/{struct.go => subscriber.go} | 58 ------------------ .../notification/{watch_util.go => websocket.go} | 57 +---------------- server/service/util/common.go | 4 ++ server/service/util/find_cache.go | 71 ++++++++++++++++++++++ server/service/util/find_cache_test.go | 60 ++++++++++++++++++ server/service/util/instance_util.go | 11 ++-- server/service/util/instance_util_test.go | 8 +-- server/service/watch.go | 2 +- 17 files changed, 322 insertions(+), 152 deletions(-) diff --git a/server/service/dependency.go b/server/service/dependency.go index eac1aa2..4c13dd0 100644 --- a/server/service/dependency.go +++ b/server/service/dependency.go @@ -140,7 +140,6 @@ func (s *MicroServiceService) GetProviderDependencies(ctx context.Context, in *p Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), }, err } - util.Logger().Debugf("GetProviderDependencies successfully, providerId is %s.", in.ServiceId) return &pb.GetProDependenciesResponse{ Response: pb.CreateResponse(pb.Response_SUCCESS, "Get all consumers successful."), Consumers: services, @@ -181,7 +180,6 @@ func (s *MicroServiceService) GetConsumerDependencies(ctx context.Context, in *p }, err } - util.Logger().Debugf("GetConsumerDependencies successfully, consumerId is %s.", consumerId) return &pb.GetConDependenciesResponse{ Response: pb.CreateResponse(pb.Response_SUCCESS, "Get all providers successfully."), Providers: services, diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go index 8144649..a80376d 100644 --- a/server/service/event/instance_event_handler.go +++ b/server/service/event/instance_event_handler.go @@ -93,7 +93,7 @@ func (h *InstanceEventHandler) OnEvent(evt backend.KvEvent) { return } - nf.PublishInstanceEvent(domainProject, action, &pb.MicroServiceKey{ + PublishInstanceEvent(domainProject, action, &pb.MicroServiceKey{ Environment: ms.Environment, AppId: ms.AppId, ServiceName: ms.ServiceName, @@ -104,3 +104,20 @@ func (h *InstanceEventHandler) OnEvent(evt backend.KvEvent) { func NewInstanceEventHandler() *InstanceEventHandler { return &InstanceEventHandler{} } + +func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, subscribers []string) { + response := &pb.WatchInstanceResponse{ + Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch instance successfully."), + Action: string(action), + Key: serviceKey, + Instance: instance, + } + for _, consumerId := range subscribers { + // expires cache + serviceUtil.FindInstancesCache.Delete(domainProject, consumerId, serviceKey) + + // TODO add超时怎么处理? + job := nf.NewWatchJob(consumerId, apt.GetInstanceRootKey(domainProject)+"/", rev, response) + nf.GetNotifyService().AddJob(job) + } +} diff --git a/server/service/event/rule_event_handler.go b/server/service/event/rule_event_handler.go index 0999dd0..474811e 100644 --- a/server/service/event/rule_event_handler.go +++ b/server/service/event/rule_event_handler.go @@ -69,7 +69,7 @@ func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, provide } providerKey := pb.MicroServiceToKey(domainProject, provider) - nf.PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, rev, consumerIds) + PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, rev, consumerIds) return nil } diff --git a/server/service/event/tag_event_handler.go b/server/service/event/tag_event_handler.go index f87f037..a83d2c5 100644 --- a/server/service/event/tag_event_handler.go +++ b/server/service/event/tag_event_handler.go @@ -73,7 +73,7 @@ func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumer util.Logger().Warnf(err, "get service %s file failed", providerId) continue } - nf.PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, + PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, &pb.MicroServiceKey{ Environment: provider.Environment, AppId: provider.AppId, diff --git a/server/service/instance.go b/server/service/instance.go index bca599a..1618886 100644 --- a/server/service/instance.go +++ b/server/service/instance.go @@ -524,6 +524,7 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest) AppId: in.AppId, ServiceName: in.ServiceName, Alias: in.ServiceName, + Version: in.VersionRule, } if apt.IsShared(provider) { // it means the shared micro-services must be the same env with SC. @@ -536,6 +537,23 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest) provider.Tenant = util.ParseTargetDomainProject(ctx) } + // cache + if item := serviceUtil.FindInstancesCache.Get(provider.Tenant, in.ConsumerServiceId, provider); item != nil { + noCache, cacheOnly := ctx.Value(serviceUtil.CTX_NOCACHE) == "1", ctx.Value(serviceUtil.CTX_CACHEONLY) == "1" + rev, _ := ctx.Value(serviceUtil.CTX_REQUEST_REVISION).(int64) + if !noCache && (cacheOnly || rev <= item.Rev) { + instances := item.Instances + if rev == item.Rev { + instances = instances[:0] + } + util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, item.Rev) + return &pb.FindInstancesResponse{ + Response: pb.CreateResponse(pb.Response_SUCCESS, "Query service instances successfully."), + Instances: instances, + }, nil + } + } + // 版本规则 ids, err := serviceUtil.FindServiceIds(ctx, in.VersionRule, provider) if err != nil { @@ -583,13 +601,19 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest) } } - instances, err := serviceUtil.GetAllInstancesOfServices(ctx, util.ParseTargetDomainProject(ctx), ids) + instances, rev, err := serviceUtil.GetAllInstancesOfServices(ctx, util.ParseTargetDomainProject(ctx), ids) if err != nil { util.Logger().Errorf(err, "find instance failed, %s: GetAllInstancesOfServices failed.", findFlag) return &pb.FindInstancesResponse{ Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), }, err } + + serviceUtil.FindInstancesCache.Set(provider.Tenant, in.ConsumerServiceId, provider, &serviceUtil.VersionRuleCacheItem{ + Instances: instances, + Rev: rev, + }) + util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, rev) return &pb.FindInstancesResponse{ Response: pb.CreateResponse(pb.Response_SUCCESS, "Query service instances successfully."), Instances: instances, diff --git a/server/service/util/common.go b/server/service/notification/common.go similarity index 52% copy from server/service/util/common.go copy to server/service/notification/common.go index 6c25e51..7e2e16a 100644 --- a/server/service/util/common.go +++ b/server/service/notification/common.go @@ -14,12 +14,41 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package util +package notification + +import ( + "fmt" + "strconv" + "time" +) const ( - HEADER_REV = "X-Resource-Revision" - CTX_NOCACHE = "noCache" - CTX_CACHEONLY = "cacheOnly" - CTX_REQUEST_REVISION = "requestRev" - CTX_RESPONSE_REVISION = "responseRev" + DEFAULT_MAX_QUEUE = 1000 + DEFAULT_INIT_SUBSCRIBERS = 1000 + DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond + DEFAULT_TIMEOUT = 30 * time.Second + + NOTIFTY NotifyType = iota + INSTANCE + typeEnd ) + +type NotifyType int + +func (nt NotifyType) String() string { + if int(nt) < len(notifyTypeNames) { + return notifyTypeNames[nt] + } + return "NotifyType" + strconv.Itoa(int(nt)) +} + +type NotifyServiceConfig struct { + AddTimeout time.Duration + NotifyTimeout time.Duration + MaxQueue int64 +} + +func (nsc NotifyServiceConfig) String() string { + return fmt.Sprintf("{acceptQueue: %d, accept: %s, notify: %s}", + nsc.MaxQueue, nsc.AddTimeout, nsc.NotifyTimeout) +} diff --git a/server/service/notification/listwatcher.go b/server/service/notification/listwatcher.go index 5d911a2..a6942c9 100644 --- a/server/service/notification/listwatcher.go +++ b/server/service/notification/listwatcher.go @@ -32,7 +32,7 @@ type WatchJob struct { type ListWatcher struct { BaseSubscriber - Job chan NotifyJob + Job chan *WatchJob ListRevision int64 ListFunc func() (results []*pb.WatchInstanceResponse, rev int64) @@ -56,7 +56,7 @@ func (w *ListWatcher) listAndPublishJobs(_ context.Context) { results, rev := w.ListFunc() w.ListRevision = rev for _, response := range results { - w.sendMessage(NewWatchJob(w.Type(), w.Id(), w.Subject(), w.ListRevision, response)) + w.sendMessage(NewWatchJob(w.Id(), w.Subject(), w.ListRevision, response)) } } @@ -66,6 +66,11 @@ func (w *ListWatcher) OnMessage(job NotifyJob) { return } + wJob, ok := job.(*WatchJob) + if !ok { + return + } + timer := time.NewTimer(DEFAULT_ON_MESSAGE_TIMEOUT) select { case <-w.listCh: @@ -77,16 +82,16 @@ func (w *ListWatcher) OnMessage(job NotifyJob) { return } - if job.(*WatchJob).Revision <= w.ListRevision { + if wJob.Revision <= w.ListRevision { util.Logger().Warnf(nil, "unexpected notify %s job is coming in, watcher %s %s, job is %v, current revision is %v", w.Type(), w.Id(), w.Subject(), job, w.ListRevision) return } - w.sendMessage(job) + w.sendMessage(wJob) } -func (w *ListWatcher) sendMessage(job NotifyJob) { +func (w *ListWatcher) sendMessage(job *WatchJob) { util.Logger().Debugf("start to notify %s watcher %s %s, job is %v, current revision is %v", w.Type(), w.Id(), w.Subject(), job, w.ListRevision) defer util.RecoverAndReport() @@ -105,27 +110,27 @@ func (w *ListWatcher) Close() { close(w.Job) } -func NewWatchJob(nType NotifyType, subscriberId, subject string, rev int64, response *pb.WatchInstanceResponse) *WatchJob { +func NewWatchJob(subscriberId, subject string, rev int64, response *pb.WatchInstanceResponse) *WatchJob { return &WatchJob{ BaseNotifyJob: BaseNotifyJob{ subscriberId: subscriberId, subject: subject, - nType: nType, + nType: INSTANCE, }, Revision: rev, Response: response, } } -func NewListWatcher(nType NotifyType, id string, subject string, +func NewListWatcher(id string, subject string, listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher { watcher := &ListWatcher{ BaseSubscriber: BaseSubscriber{ id: id, subject: subject, - nType: nType, + nType: INSTANCE, }, - Job: make(chan NotifyJob, DEFAULT_MAX_QUEUE), + Job: make(chan *WatchJob, DEFAULT_MAX_QUEUE), ListFunc: listFunc, listCh: make(chan struct{}), } diff --git a/server/service/util/common.go b/server/service/notification/notice.go similarity index 66% copy from server/service/util/common.go copy to server/service/notification/notice.go index 6c25e51..498d435 100644 --- a/server/service/util/common.go +++ b/server/service/notification/notice.go @@ -14,12 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package util +package notification -const ( - HEADER_REV = "X-Resource-Revision" - CTX_NOCACHE = "noCache" - CTX_CACHEONLY = "cacheOnly" - CTX_REQUEST_REVISION = "requestRev" - CTX_RESPONSE_REVISION = "responseRev" -) +type NotifyJob interface { + SubscriberId() string + Subject() string + Type() NotifyType +} + +type BaseNotifyJob struct { + subscriberId string + subject string + nType NotifyType +} + +func (s *BaseNotifyJob) SubscriberId() string { + return s.subscriberId +} + +func (s *BaseNotifyJob) Subject() string { + return s.subject +} + +func (s *BaseNotifyJob) Type() NotifyType { + return s.nType +} diff --git a/server/service/notification/stream.go b/server/service/notification/stream.go new file mode 100644 index 0000000..101473d --- /dev/null +++ b/server/service/notification/stream.go @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package notification + +import ( + "errors" + "github.com/apache/incubator-servicecomb-service-center/pkg/util" + pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" + "time" +) + +func HandleWatchJob(watcher *ListWatcher, stream pb.ServiceInstanceCtrl_WatchServer, timeout time.Duration) (err error) { + for { + timer := time.NewTimer(timeout) + select { + case <-timer.C: + // TODO grpc 长连接心跳? + case job := <-watcher.Job: + timer.Stop() + + if job == nil { + err = errors.New("channel is closed") + util.Logger().Errorf(err, "watcher %s %s caught an exception", + watcher.Subject(), watcher.Id()) + return + } + resp := job.Response + util.Logger().Infof("event is coming in, watcher %s %s", + watcher.Subject(), watcher.Id()) + + err = stream.Send(resp) + if err != nil { + util.Logger().Errorf(err, "send message error, watcher %s %s", + watcher.Subject(), watcher.Id()) + watcher.SetError(err) + return + } + } + } +} diff --git a/server/service/notification/struct.go b/server/service/notification/subscriber.go similarity index 64% rename from server/service/notification/struct.go rename to server/service/notification/subscriber.go index f6564ff..9f0b4df 100644 --- a/server/service/notification/struct.go +++ b/server/service/notification/subscriber.go @@ -18,42 +18,8 @@ package notification import ( "errors" - "fmt" - "strconv" - "time" ) -const ( - DEFAULT_MAX_QUEUE = 1000 - DEFAULT_INIT_SUBSCRIBERS = 1000 - DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond - DEFAULT_TIMEOUT = 30 * time.Second - - NOTIFTY NotifyType = iota - INSTANCE - typeEnd -) - -type NotifyType int - -func (nt NotifyType) String() string { - if int(nt) < len(notifyTypeNames) { - return notifyTypeNames[nt] - } - return "NotifyType" + strconv.Itoa(int(nt)) -} - -type NotifyServiceConfig struct { - AddTimeout time.Duration - NotifyTimeout time.Duration - MaxQueue int64 -} - -func (nsc NotifyServiceConfig) String() string { - return fmt.Sprintf("{acceptQueue: %d, accept: %s, notify: %s}", - nsc.MaxQueue, nsc.AddTimeout, nsc.NotifyTimeout) -} - type Subscriber interface { Err() error SetError(err error) @@ -68,12 +34,6 @@ type Subscriber interface { Close() } -type NotifyJob interface { - SubscriberId() string - Subject() string - Type() NotifyType -} - type BaseSubscriber struct { id string subject string @@ -122,21 +82,3 @@ func (s *BaseSubscriber) OnMessage(job NotifyJob) { func (s *BaseSubscriber) Close() { } - -type BaseNotifyJob struct { - subscriberId string - subject string - nType NotifyType -} - -func (s *BaseNotifyJob) SubscriberId() string { - return s.subscriberId -} - -func (s *BaseNotifyJob) Subject() string { - return s.subject -} - -func (s *BaseNotifyJob) Type() NotifyType { - return s.nType -} diff --git a/server/service/notification/watch_util.go b/server/service/notification/websocket.go similarity index 82% rename from server/service/notification/watch_util.go rename to server/service/notification/websocket.go index 3a13500..f53e641 100644 --- a/server/service/notification/watch_util.go +++ b/server/service/notification/websocket.go @@ -18,7 +18,6 @@ package notification import ( "encoding/json" - "errors" "fmt" "github.com/apache/incubator-servicecomb-service-center/pkg/util" apt "github.com/apache/incubator-servicecomb-service-center/server/core" @@ -29,36 +28,6 @@ import ( "time" ) -func HandleWatchJob(watcher *ListWatcher, stream pb.ServiceInstanceCtrl_WatchServer, timeout time.Duration) (err error) { - for { - timer := time.NewTimer(timeout) - select { - case <-timer.C: - // TODO grpc 长连接心跳? - case job := <-watcher.Job: - timer.Stop() - - if job == nil { - err = errors.New("channel is closed") - util.Logger().Errorf(err, "watcher %s %s caught an exception", - watcher.Subject(), watcher.Id()) - return - } - resp := job.(*WatchJob).Response - util.Logger().Infof("event is coming in, watcher %s %s", - watcher.Subject(), watcher.Id()) - - err = stream.Send(resp) - if err != nil { - util.Logger().Errorf(err, "send message error, watcher %s %s", - watcher.Subject(), watcher.Id()) - watcher.SetError(err) - return - } - } - } -} - type WebSocketHandler struct { ctx context.Context conn *websocket.Conn @@ -71,7 +40,7 @@ type WebSocketHandler struct { func (wh *WebSocketHandler) Init() error { remoteAddr := wh.conn.RemoteAddr().String() if err := GetNotifyService().AddSubscriber(wh.watcher); err != nil { - err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s.", + err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s", remoteAddr, err.Error()) util.Logger().Errorf(nil, err.Error()) @@ -207,7 +176,7 @@ func (wh *WebSocketHandler) HandleWatchWebSocketJob() { return } - resp := job.(*WatchJob).Response + resp := job.Response providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version) if resp.Action != string(pb.EVT_EXPIRE) { @@ -261,7 +230,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([] handler := &WebSocketHandler{ ctx: ctx, conn: conn, - watcher: NewInstanceListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f), + watcher: NewListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f), needPingWatcher: true, closed: make(chan struct{}), goroutine: util.NewGo(context.Background()), @@ -283,23 +252,3 @@ func EstablishWebSocketError(conn *websocket.Conn, err error) { util.Logger().Errorf(err, "establish[%s] websocket watch failed: write message failed.", remoteAddr) } } - -func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, subscribers []string) { - response := &pb.WatchInstanceResponse{ - Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch instance successfully."), - Action: string(action), - Key: serviceKey, - Instance: instance, - } - for _, consumerId := range subscribers { - job := NewWatchJob(INSTANCE, consumerId, apt.GetInstanceRootKey(domainProject)+"/", rev, response) - util.Logger().Debugf("publish event to notify service, %v", job) - - // TODO add超时怎么处理? - GetNotifyService().AddJob(job) - } -} - -func NewInstanceListWatcher(selfServiceId, instanceRoot string, listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher { - return NewListWatcher(INSTANCE, selfServiceId, instanceRoot, listFunc) -} diff --git a/server/service/util/common.go b/server/service/util/common.go index 6c25e51..9e31b3f 100644 --- a/server/service/util/common.go +++ b/server/service/util/common.go @@ -16,10 +16,14 @@ */ package util +import "time" + const ( HEADER_REV = "X-Resource-Revision" CTX_NOCACHE = "noCache" CTX_CACHEONLY = "cacheOnly" CTX_REQUEST_REVISION = "requestRev" CTX_RESPONSE_REVISION = "responseRev" + + cacheTTL = 5 * time.Minute ) diff --git a/server/service/util/find_cache.go b/server/service/util/find_cache.go new file mode 100644 index 0000000..d5c2ab7 --- /dev/null +++ b/server/service/util/find_cache.go @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package util + +import ( + "errors" + "github.com/apache/incubator-servicecomb-service-center/pkg/util" + pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" + "github.com/karlseguin/ccache" +) + +var FindInstancesCache = &VersionRuleCache{ + c: ccache.Layered(ccache.Configure()), +} + +type VersionRuleCacheItem struct { + Instances []*pb.MicroServiceInstance + Rev int64 +} + +type VersionRuleCache struct { + c *ccache.LayeredCache +} + +func (c *VersionRuleCache) primaryKey(domainProject string, provider *pb.MicroServiceKey) string { + return util.StringJoin([]string{ + domainProject, + provider.Environment, + provider.AppId, + provider.ServiceName}, "/") +} + +func (c *VersionRuleCache) Get(domainProject, consumer string, provider *pb.MicroServiceKey) *VersionRuleCacheItem { + item, _ := c.c.Fetch(c.primaryKey(domainProject, provider), provider.Version, cacheTTL, func() (interface{}, error) { + return nil, errors.New("not exist") + }) + if item == nil || item.Expired() { + return nil + } + + if v, ok := item.Value().(*util.ConcurrentMap).Get(consumer); ok { + return v.(*VersionRuleCacheItem) + } + return nil +} + +func (c *VersionRuleCache) Set(domainProject, consumer string, provider *pb.MicroServiceKey, item *VersionRuleCacheItem) { + c2, _ := c.c.Fetch(c.primaryKey(domainProject, provider), provider.Version, cacheTTL, func() (interface{}, error) { + // new one if not exist + return util.NewConcurrentMap(1), nil + }) + c2.Value().(*util.ConcurrentMap).Put(consumer, item) +} + +func (c *VersionRuleCache) Delete(domainProject, consumer string, provider *pb.MicroServiceKey) { + c.c.DeleteAll(c.primaryKey(domainProject, provider)) +} diff --git a/server/service/util/find_cache_test.go b/server/service/util/find_cache_test.go new file mode 100644 index 0000000..9275101 --- /dev/null +++ b/server/service/util/find_cache_test.go @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package util + +import ( + pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" + "testing" +) + +func TestVersionRuleCache_Get(t *testing.T) { + p := &pb.MicroServiceKey{} + c := FindInstancesCache.Get("d", "c", p) + if c != nil { + t.Fatalf("TestVersionRuleCache_Get failed, %v", c) + } + + r := &VersionRuleCacheItem{Rev: 1} + FindInstancesCache.Set("d", "c", p, r) + c = FindInstancesCache.Get("d", "c", p) + if c == nil { + t.Fatalf("TestVersionRuleCache_Get failed, %v", c) + } + if c.Rev != 1 { + t.Fatalf("TestVersionRuleCache_Get failed, rev %d != 1", c.Rev) + } + c = FindInstancesCache.Get("d", "c2", p) + if c != nil { + t.Fatalf("TestVersionRuleCache_Get failed, %v", c) + } + + p2 := &pb.MicroServiceKey{ServiceName: "p2"} + FindInstancesCache.Set("d", "c2", p, r) + FindInstancesCache.Set("d", "c2", p2, r) + FindInstancesCache.Delete("d", "c", p) + c = FindInstancesCache.Get("d", "c2", p) + if c != nil { + t.Fatalf("TestVersionRuleCache_Get failed, %v", c) + } + c = FindInstancesCache.Get("d", "c2", p2) + if c == nil { + t.Fatalf("TestVersionRuleCache_Get failed, %v", c) + } + if c.Rev != 1 { + t.Fatalf("TestVersionRuleCache_Get failed, rev %d != 1", c.Rev) + } +} diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go index 6f3b3b3..bf91107 100644 --- a/server/service/util/instance_util.go +++ b/server/service/util/instance_util.go @@ -66,11 +66,12 @@ func GetInstance(ctx context.Context, domainProject string, serviceId string, in return instance, nil } -func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids []string) (instances []*pb.MicroServiceInstance, err error) { +func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids []string) ( + instances []*pb.MicroServiceInstance, rev int64, err error) { cloneCtx := util.CloneContext(ctx) noCache, cacheOnly := ctx.Value(CTX_NOCACHE) == "1", ctx.Value(CTX_CACHEONLY) == "1" - rev, _ := cloneCtx.Value(CTX_REQUEST_REVISION).(int64) + rev, _ = cloneCtx.Value(CTX_REQUEST_REVISION).(int64) if !noCache && !cacheOnly && rev > 0 { // force to find in cache at first time when rev > 0 util.SetContext(cloneCtx, CTX_CACHEONLY, "1") @@ -86,7 +87,7 @@ func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids [] opts := append(FromContext(cloneCtx), registry.WithStrKey(key), registry.WithPrefix()) resp, err := backend.Store().Instance().Search(cloneCtx, opts...) if err != nil { - return nil, err + return nil, 0, err } if len(resp.Kvs) > 0 { @@ -122,13 +123,13 @@ func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids [] instance := &pb.MicroServiceInstance{} err := json.Unmarshal(kv.Value, instance) if err != nil { - return nil, fmt.Errorf("unmarshal %s faild, %s", + return nil, 0, fmt.Errorf("unmarshal %s faild, %s", util.BytesToStringWithNoCopy(kv.Key), err.Error()) } instances = append(instances, instance) } - util.SetContext(ctx, CTX_RESPONSE_REVISION, max) + rev = max return } diff --git a/server/service/util/instance_util_test.go b/server/service/util/instance_util_test.go index 635be35..b99d810 100644 --- a/server/service/util/instance_util_test.go +++ b/server/service/util/instance_util_test.go @@ -116,19 +116,19 @@ func TestGetInstanceCountOfOneService(t *testing.T) { } func TestGetInstanceCountOfServices(t *testing.T) { - _, err := GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_CACHEONLY, "1"), "", []string{"1"}) + _, _, err := GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_CACHEONLY, "1"), "", []string{"1"}) if err != nil { t.Fatalf(`GetAllInstancesOfServices CTX_CACHEONLY failed`) } - _, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_NOCACHE, "1"), "", []string{"1"}) + _, _, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_NOCACHE, "1"), "", []string{"1"}) if err == nil { t.Fatalf(`GetAllInstancesOfServices CTX_NOCACHE failed`) } - _, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_REQUEST_REVISION, 1), "", []string{"1"}) + _, _, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_REQUEST_REVISION, 1), "", []string{"1"}) if err == nil { t.Fatalf(`GetAllInstancesOfServices CTX_REQUEST_REVISION failed`) } - _, err = GetAllInstancesOfServices(context.Background(), "", []string{"1"}) + _, _, err = GetAllInstancesOfServices(context.Background(), "", []string{"1"}) if err == nil { t.Fatalf(`GetAllInstancesOfServices failed`) } diff --git a/server/service/watch.go b/server/service/watch.go index e0b5780..1fcefa0 100644 --- a/server/service/watch.go +++ b/server/service/watch.go @@ -45,7 +45,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream pb.ServiceIn return err } domainProject := util.ParseDomainProject(stream.Context()) - watcher := nf.NewInstanceListWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/", nil) + watcher := nf.NewListWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/", nil) err = nf.GetNotifyService().AddSubscriber(watcher) util.Logger().Infof("start watch instance status, watcher %s %s", watcher.Subject(), watcher.Id()) return nf.HandleWatchJob(watcher, stream, nf.GetNotifyService().Config.NotifyTimeout)