asifdxtreme closed pull request #376: SCB-680 Optimize find instance api
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/376
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/service/dependency.go b/server/service/dependency.go
index eac1aa2d..4c13dd0c 100644
--- a/server/service/dependency.go
+++ b/server/service/dependency.go
@@ -140,7 +140,6 @@ func (s *MicroServiceService) GetProviderDependencies(ctx 
context.Context, in *p
                        Response: pb.CreateResponse(scerr.ErrInternal, 
err.Error()),
                }, err
        }
-       util.Logger().Debugf("GetProviderDependencies successfully, providerId 
is %s.", in.ServiceId)
        return &pb.GetProDependenciesResponse{
                Response:  pb.CreateResponse(pb.Response_SUCCESS, "Get all 
consumers successful."),
                Consumers: services,
@@ -181,7 +180,6 @@ func (s *MicroServiceService) GetConsumerDependencies(ctx 
context.Context, in *p
                }, err
        }
 
-       util.Logger().Debugf("GetConsumerDependencies successfully, consumerId 
is %s.", consumerId)
        return &pb.GetConDependenciesResponse{
                Response:  pb.CreateResponse(pb.Response_SUCCESS, "Get all 
providers successfully."),
                Providers: services,
diff --git a/server/service/event/instance_event_handler.go 
b/server/service/event/instance_event_handler.go
index 81446492..a80376d1 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -93,7 +93,7 @@ func (h *InstanceEventHandler) OnEvent(evt backend.KvEvent) {
                return
        }
 
-       nf.PublishInstanceEvent(domainProject, action, &pb.MicroServiceKey{
+       PublishInstanceEvent(domainProject, action, &pb.MicroServiceKey{
                Environment: ms.Environment,
                AppId:       ms.AppId,
                ServiceName: ms.ServiceName,
@@ -104,3 +104,20 @@ func (h *InstanceEventHandler) OnEvent(evt 
backend.KvEvent) {
 func NewInstanceEventHandler() *InstanceEventHandler {
        return &InstanceEventHandler{}
 }
+
+func PublishInstanceEvent(domainProject string, action pb.EventType, 
serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, 
subscribers []string) {
+       response := &pb.WatchInstanceResponse{
+               Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch 
instance successfully."),
+               Action:   string(action),
+               Key:      serviceKey,
+               Instance: instance,
+       }
+       for _, consumerId := range subscribers {
+               // expires cache
+               serviceUtil.FindInstancesCache.Delete(domainProject, 
consumerId, serviceKey)
+
+               // TODO add超时怎么处理?
+               job := nf.NewWatchJob(consumerId, 
apt.GetInstanceRootKey(domainProject)+"/", rev, response)
+               nf.GetNotifyService().AddJob(job)
+       }
+}
diff --git a/server/service/event/rule_event_handler.go 
b/server/service/event/rule_event_handler.go
index 0999dd08..474811e2 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -69,7 +69,7 @@ func (apt *RulesChangedTask) publish(ctx context.Context, 
domainProject, provide
        }
        providerKey := pb.MicroServiceToKey(domainProject, provider)
 
-       nf.PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, 
rev, consumerIds)
+       PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, 
rev, consumerIds)
        return nil
 }
 
diff --git a/server/service/event/tag_event_handler.go 
b/server/service/event/tag_event_handler.go
index f87f0372..a83d2c5b 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -73,7 +73,7 @@ func (apt *TagsChangedTask) publish(ctx context.Context, 
domainProject, consumer
                        util.Logger().Warnf(err, "get service %s file failed", 
providerId)
                        continue
                }
-               nf.PublishInstanceEvent(domainProject, pb.EVT_EXPIRE,
+               PublishInstanceEvent(domainProject, pb.EVT_EXPIRE,
                        &pb.MicroServiceKey{
                                Environment: provider.Environment,
                                AppId:       provider.AppId,
diff --git a/server/service/instance.go b/server/service/instance.go
index bca599a3..1618886f 100644
--- a/server/service/instance.go
+++ b/server/service/instance.go
@@ -524,6 +524,7 @@ func (s *InstanceService) Find(ctx context.Context, in 
*pb.FindInstancesRequest)
                AppId:       in.AppId,
                ServiceName: in.ServiceName,
                Alias:       in.ServiceName,
+               Version:     in.VersionRule,
        }
        if apt.IsShared(provider) {
                // it means the shared micro-services must be the same env with 
SC.
@@ -536,6 +537,23 @@ func (s *InstanceService) Find(ctx context.Context, in 
*pb.FindInstancesRequest)
                provider.Tenant = util.ParseTargetDomainProject(ctx)
        }
 
+       // cache
+       if item := serviceUtil.FindInstancesCache.Get(provider.Tenant, 
in.ConsumerServiceId, provider); item != nil {
+               noCache, cacheOnly := ctx.Value(serviceUtil.CTX_NOCACHE) == 
"1", ctx.Value(serviceUtil.CTX_CACHEONLY) == "1"
+               rev, _ := ctx.Value(serviceUtil.CTX_REQUEST_REVISION).(int64)
+               if !noCache && (cacheOnly || rev <= item.Rev) {
+                       instances := item.Instances
+                       if rev == item.Rev {
+                               instances = instances[:0]
+                       }
+                       util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, 
item.Rev)
+                       return &pb.FindInstancesResponse{
+                               Response:  
pb.CreateResponse(pb.Response_SUCCESS, "Query service instances successfully."),
+                               Instances: instances,
+                       }, nil
+               }
+       }
+
        // 版本规则
        ids, err := serviceUtil.FindServiceIds(ctx, in.VersionRule, provider)
        if err != nil {
@@ -583,13 +601,19 @@ func (s *InstanceService) Find(ctx context.Context, in 
*pb.FindInstancesRequest)
                }
        }
 
-       instances, err := serviceUtil.GetAllInstancesOfServices(ctx, 
util.ParseTargetDomainProject(ctx), ids)
+       instances, rev, err := serviceUtil.GetAllInstancesOfServices(ctx, 
util.ParseTargetDomainProject(ctx), ids)
        if err != nil {
                util.Logger().Errorf(err, "find instance failed, %s: 
GetAllInstancesOfServices failed.", findFlag)
                return &pb.FindInstancesResponse{
                        Response: pb.CreateResponse(scerr.ErrInternal, 
err.Error()),
                }, err
        }
+
+       serviceUtil.FindInstancesCache.Set(provider.Tenant, 
in.ConsumerServiceId, provider, &serviceUtil.VersionRuleCacheItem{
+               Instances: instances,
+               Rev:       rev,
+       })
+       util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, rev)
        return &pb.FindInstancesResponse{
                Response:  pb.CreateResponse(pb.Response_SUCCESS, "Query 
service instances successfully."),
                Instances: instances,
diff --git a/server/service/notification/common.go 
b/server/service/notification/common.go
new file mode 100644
index 00000000..7e2e16a3
--- /dev/null
+++ b/server/service/notification/common.go
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package notification
+
+import (
+       "fmt"
+       "strconv"
+       "time"
+)
+
+const (
+       DEFAULT_MAX_QUEUE          = 1000
+       DEFAULT_INIT_SUBSCRIBERS   = 1000
+       DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
+       DEFAULT_TIMEOUT            = 30 * time.Second
+
+       NOTIFTY NotifyType = iota
+       INSTANCE
+       typeEnd
+)
+
+type NotifyType int
+
+func (nt NotifyType) String() string {
+       if int(nt) < len(notifyTypeNames) {
+               return notifyTypeNames[nt]
+       }
+       return "NotifyType" + strconv.Itoa(int(nt))
+}
+
+type NotifyServiceConfig struct {
+       AddTimeout    time.Duration
+       NotifyTimeout time.Duration
+       MaxQueue      int64
+}
+
+func (nsc NotifyServiceConfig) String() string {
+       return fmt.Sprintf("{acceptQueue: %d, accept: %s, notify: %s}",
+               nsc.MaxQueue, nsc.AddTimeout, nsc.NotifyTimeout)
+}
diff --git a/server/service/notification/listwatcher.go 
b/server/service/notification/listwatcher.go
index 5d911a20..a6942c92 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -32,7 +32,7 @@ type WatchJob struct {
 
 type ListWatcher struct {
        BaseSubscriber
-       Job          chan NotifyJob
+       Job          chan *WatchJob
        ListRevision int64
        ListFunc     func() (results []*pb.WatchInstanceResponse, rev int64)
 
@@ -56,7 +56,7 @@ func (w *ListWatcher) listAndPublishJobs(_ context.Context) {
        results, rev := w.ListFunc()
        w.ListRevision = rev
        for _, response := range results {
-               w.sendMessage(NewWatchJob(w.Type(), w.Id(), w.Subject(), 
w.ListRevision, response))
+               w.sendMessage(NewWatchJob(w.Id(), w.Subject(), w.ListRevision, 
response))
        }
 }
 
@@ -66,6 +66,11 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
                return
        }
 
+       wJob, ok := job.(*WatchJob)
+       if !ok {
+               return
+       }
+
        timer := time.NewTimer(DEFAULT_ON_MESSAGE_TIMEOUT)
        select {
        case <-w.listCh:
@@ -77,16 +82,16 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
                return
        }
 
-       if job.(*WatchJob).Revision <= w.ListRevision {
+       if wJob.Revision <= w.ListRevision {
                util.Logger().Warnf(nil,
                        "unexpected notify %s job is coming in, watcher %s %s, 
job is %v, current revision is %v",
                        w.Type(), w.Id(), w.Subject(), job, w.ListRevision)
                return
        }
-       w.sendMessage(job)
+       w.sendMessage(wJob)
 }
 
-func (w *ListWatcher) sendMessage(job NotifyJob) {
+func (w *ListWatcher) sendMessage(job *WatchJob) {
        util.Logger().Debugf("start to notify %s watcher %s %s, job is %v, 
current revision is %v", w.Type(),
                w.Id(), w.Subject(), job, w.ListRevision)
        defer util.RecoverAndReport()
@@ -105,27 +110,27 @@ func (w *ListWatcher) Close() {
        close(w.Job)
 }
 
-func NewWatchJob(nType NotifyType, subscriberId, subject string, rev int64, 
response *pb.WatchInstanceResponse) *WatchJob {
+func NewWatchJob(subscriberId, subject string, rev int64, response 
*pb.WatchInstanceResponse) *WatchJob {
        return &WatchJob{
                BaseNotifyJob: BaseNotifyJob{
                        subscriberId: subscriberId,
                        subject:      subject,
-                       nType:        nType,
+                       nType:        INSTANCE,
                },
                Revision: rev,
                Response: response,
        }
 }
 
-func NewListWatcher(nType NotifyType, id string, subject string,
+func NewListWatcher(id string, subject string,
        listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) 
*ListWatcher {
        watcher := &ListWatcher{
                BaseSubscriber: BaseSubscriber{
                        id:      id,
                        subject: subject,
-                       nType:   nType,
+                       nType:   INSTANCE,
                },
-               Job:      make(chan NotifyJob, DEFAULT_MAX_QUEUE),
+               Job:      make(chan *WatchJob, DEFAULT_MAX_QUEUE),
                ListFunc: listFunc,
                listCh:   make(chan struct{}),
        }
diff --git a/server/service/notification/notice.go 
b/server/service/notification/notice.go
new file mode 100644
index 00000000..498d435c
--- /dev/null
+++ b/server/service/notification/notice.go
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package notification
+
+type NotifyJob interface {
+       SubscriberId() string
+       Subject() string
+       Type() NotifyType
+}
+
+type BaseNotifyJob struct {
+       subscriberId string
+       subject      string
+       nType        NotifyType
+}
+
+func (s *BaseNotifyJob) SubscriberId() string {
+       return s.subscriberId
+}
+
+func (s *BaseNotifyJob) Subject() string {
+       return s.subject
+}
+
+func (s *BaseNotifyJob) Type() NotifyType {
+       return s.nType
+}
diff --git a/server/service/notification/stream.go 
b/server/service/notification/stream.go
new file mode 100644
index 00000000..101473de
--- /dev/null
+++ b/server/service/notification/stream.go
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package notification
+
+import (
+       "errors"
+       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       "time"
+)
+
+func HandleWatchJob(watcher *ListWatcher, stream 
pb.ServiceInstanceCtrl_WatchServer, timeout time.Duration) (err error) {
+       for {
+               timer := time.NewTimer(timeout)
+               select {
+               case <-timer.C:
+                       // TODO grpc 长连接心跳?
+               case job := <-watcher.Job:
+                       timer.Stop()
+
+                       if job == nil {
+                               err = errors.New("channel is closed")
+                               util.Logger().Errorf(err, "watcher %s %s caught 
an exception",
+                                       watcher.Subject(), watcher.Id())
+                               return
+                       }
+                       resp := job.Response
+                       util.Logger().Infof("event is coming in, watcher %s %s",
+                               watcher.Subject(), watcher.Id())
+
+                       err = stream.Send(resp)
+                       if err != nil {
+                               util.Logger().Errorf(err, "send message error, 
watcher %s %s",
+                                       watcher.Subject(), watcher.Id())
+                               watcher.SetError(err)
+                               return
+                       }
+               }
+       }
+}
diff --git a/server/service/notification/struct.go 
b/server/service/notification/subscriber.go
similarity index 64%
rename from server/service/notification/struct.go
rename to server/service/notification/subscriber.go
index f6564ffa..9f0b4dff 100644
--- a/server/service/notification/struct.go
+++ b/server/service/notification/subscriber.go
@@ -18,42 +18,8 @@ package notification
 
 import (
        "errors"
-       "fmt"
-       "strconv"
-       "time"
 )
 
-const (
-       DEFAULT_MAX_QUEUE          = 1000
-       DEFAULT_INIT_SUBSCRIBERS   = 1000
-       DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
-       DEFAULT_TIMEOUT            = 30 * time.Second
-
-       NOTIFTY NotifyType = iota
-       INSTANCE
-       typeEnd
-)
-
-type NotifyType int
-
-func (nt NotifyType) String() string {
-       if int(nt) < len(notifyTypeNames) {
-               return notifyTypeNames[nt]
-       }
-       return "NotifyType" + strconv.Itoa(int(nt))
-}
-
-type NotifyServiceConfig struct {
-       AddTimeout    time.Duration
-       NotifyTimeout time.Duration
-       MaxQueue      int64
-}
-
-func (nsc NotifyServiceConfig) String() string {
-       return fmt.Sprintf("{acceptQueue: %d, accept: %s, notify: %s}",
-               nsc.MaxQueue, nsc.AddTimeout, nsc.NotifyTimeout)
-}
-
 type Subscriber interface {
        Err() error
        SetError(err error)
@@ -68,12 +34,6 @@ type Subscriber interface {
        Close()
 }
 
-type NotifyJob interface {
-       SubscriberId() string
-       Subject() string
-       Type() NotifyType
-}
-
 type BaseSubscriber struct {
        id      string
        subject string
@@ -122,21 +82,3 @@ func (s *BaseSubscriber) OnMessage(job NotifyJob) {
 func (s *BaseSubscriber) Close() {
 
 }
-
-type BaseNotifyJob struct {
-       subscriberId string
-       subject      string
-       nType        NotifyType
-}
-
-func (s *BaseNotifyJob) SubscriberId() string {
-       return s.subscriberId
-}
-
-func (s *BaseNotifyJob) Subject() string {
-       return s.subject
-}
-
-func (s *BaseNotifyJob) Type() NotifyType {
-       return s.nType
-}
diff --git a/server/service/notification/watch_util.go 
b/server/service/notification/websocket.go
similarity index 82%
rename from server/service/notification/watch_util.go
rename to server/service/notification/websocket.go
index 3a13500a..f53e641d 100644
--- a/server/service/notification/watch_util.go
+++ b/server/service/notification/websocket.go
@@ -18,7 +18,6 @@ package notification
 
 import (
        "encoding/json"
-       "errors"
        "fmt"
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        apt "github.com/apache/incubator-servicecomb-service-center/server/core"
@@ -29,36 +28,6 @@ import (
        "time"
 )
 
-func HandleWatchJob(watcher *ListWatcher, stream 
pb.ServiceInstanceCtrl_WatchServer, timeout time.Duration) (err error) {
-       for {
-               timer := time.NewTimer(timeout)
-               select {
-               case <-timer.C:
-               // TODO grpc 长连接心跳?
-               case job := <-watcher.Job:
-                       timer.Stop()
-
-                       if job == nil {
-                               err = errors.New("channel is closed")
-                               util.Logger().Errorf(err, "watcher %s %s caught 
an exception",
-                                       watcher.Subject(), watcher.Id())
-                               return
-                       }
-                       resp := job.(*WatchJob).Response
-                       util.Logger().Infof("event is coming in, watcher %s %s",
-                               watcher.Subject(), watcher.Id())
-
-                       err = stream.Send(resp)
-                       if err != nil {
-                               util.Logger().Errorf(err, "send message error, 
watcher %s %s",
-                                       watcher.Subject(), watcher.Id())
-                               watcher.SetError(err)
-                               return
-                       }
-               }
-       }
-}
-
 type WebSocketHandler struct {
        ctx             context.Context
        conn            *websocket.Conn
@@ -71,7 +40,7 @@ type WebSocketHandler struct {
 func (wh *WebSocketHandler) Init() error {
        remoteAddr := wh.conn.RemoteAddr().String()
        if err := GetNotifyService().AddSubscriber(wh.watcher); err != nil {
-               err = fmt.Errorf("establish[%s] websocket watch failed: notify 
service error, %s.",
+               err = fmt.Errorf("establish[%s] websocket watch failed: notify 
service error, %s",
                        remoteAddr, err.Error())
                util.Logger().Errorf(nil, err.Error())
 
@@ -207,7 +176,7 @@ func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
                                return
                        }
 
-                       resp := job.(*WatchJob).Response
+                       resp := job.Response
 
                        providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, 
resp.Key.ServiceName, resp.Key.Version)
                        if resp.Action != string(pb.EVT_EXPIRE) {
@@ -261,7 +230,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId 
string, f func() ([]
        handler := &WebSocketHandler{
                ctx:             ctx,
                conn:            conn,
-               watcher:         NewInstanceListWatcher(serviceId, 
apt.GetInstanceRootKey(domainProject)+"/", f),
+               watcher:         NewListWatcher(serviceId, 
apt.GetInstanceRootKey(domainProject)+"/", f),
                needPingWatcher: true,
                closed:          make(chan struct{}),
                goroutine:       util.NewGo(context.Background()),
@@ -283,23 +252,3 @@ func EstablishWebSocketError(conn *websocket.Conn, err 
error) {
                util.Logger().Errorf(err, "establish[%s] websocket watch 
failed: write message failed.", remoteAddr)
        }
 }
-
-func PublishInstanceEvent(domainProject string, action pb.EventType, 
serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, 
subscribers []string) {
-       response := &pb.WatchInstanceResponse{
-               Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch 
instance successfully."),
-               Action:   string(action),
-               Key:      serviceKey,
-               Instance: instance,
-       }
-       for _, consumerId := range subscribers {
-               job := NewWatchJob(INSTANCE, consumerId, 
apt.GetInstanceRootKey(domainProject)+"/", rev, response)
-               util.Logger().Debugf("publish event to notify service, %v", job)
-
-               // TODO add超时怎么处理?
-               GetNotifyService().AddJob(job)
-       }
-}
-
-func NewInstanceListWatcher(selfServiceId, instanceRoot string, listFunc 
func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
-       return NewListWatcher(INSTANCE, selfServiceId, instanceRoot, listFunc)
-}
diff --git a/server/service/util/common.go b/server/service/util/common.go
index 6c25e513..9e31b3f7 100644
--- a/server/service/util/common.go
+++ b/server/service/util/common.go
@@ -16,10 +16,14 @@
  */
 package util
 
+import "time"
+
 const (
        HEADER_REV            = "X-Resource-Revision"
        CTX_NOCACHE           = "noCache"
        CTX_CACHEONLY         = "cacheOnly"
        CTX_REQUEST_REVISION  = "requestRev"
        CTX_RESPONSE_REVISION = "responseRev"
+
+       cacheTTL = 5 * time.Minute
 )
diff --git a/server/service/util/find_cache.go 
b/server/service/util/find_cache.go
new file mode 100644
index 00000000..d5c2ab73
--- /dev/null
+++ b/server/service/util/find_cache.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+       "errors"
+       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       "github.com/karlseguin/ccache"
+)
+
+var FindInstancesCache = &VersionRuleCache{
+       c: ccache.Layered(ccache.Configure()),
+}
+
+type VersionRuleCacheItem struct {
+       Instances []*pb.MicroServiceInstance
+       Rev       int64
+}
+
+type VersionRuleCache struct {
+       c *ccache.LayeredCache
+}
+
+func (c *VersionRuleCache) primaryKey(domainProject string, provider 
*pb.MicroServiceKey) string {
+       return util.StringJoin([]string{
+               domainProject,
+               provider.Environment,
+               provider.AppId,
+               provider.ServiceName}, "/")
+}
+
+func (c *VersionRuleCache) Get(domainProject, consumer string, provider 
*pb.MicroServiceKey) *VersionRuleCacheItem {
+       item, _ := c.c.Fetch(c.primaryKey(domainProject, provider), 
provider.Version, cacheTTL, func() (interface{}, error) {
+               return nil, errors.New("not exist")
+       })
+       if item == nil || item.Expired() {
+               return nil
+       }
+
+       if v, ok := item.Value().(*util.ConcurrentMap).Get(consumer); ok {
+               return v.(*VersionRuleCacheItem)
+       }
+       return nil
+}
+
+func (c *VersionRuleCache) Set(domainProject, consumer string, provider 
*pb.MicroServiceKey, item *VersionRuleCacheItem) {
+       c2, _ := c.c.Fetch(c.primaryKey(domainProject, provider), 
provider.Version, cacheTTL, func() (interface{}, error) {
+               // new one if not exist
+               return util.NewConcurrentMap(1), nil
+       })
+       c2.Value().(*util.ConcurrentMap).Put(consumer, item)
+}
+
+func (c *VersionRuleCache) Delete(domainProject, consumer string, provider 
*pb.MicroServiceKey) {
+       c.c.DeleteAll(c.primaryKey(domainProject, provider))
+}
diff --git a/server/service/util/find_cache_test.go 
b/server/service/util/find_cache_test.go
new file mode 100644
index 00000000..92751015
--- /dev/null
+++ b/server/service/util/find_cache_test.go
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+       pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       "testing"
+)
+
+func TestVersionRuleCache_Get(t *testing.T) {
+       p := &pb.MicroServiceKey{}
+       c := FindInstancesCache.Get("d", "c", p)
+       if c != nil {
+               t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+       }
+
+       r := &VersionRuleCacheItem{Rev: 1}
+       FindInstancesCache.Set("d", "c", p, r)
+       c = FindInstancesCache.Get("d", "c", p)
+       if c == nil {
+               t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+       }
+       if c.Rev != 1 {
+               t.Fatalf("TestVersionRuleCache_Get failed, rev %d != 1", c.Rev)
+       }
+       c = FindInstancesCache.Get("d", "c2", p)
+       if c != nil {
+               t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+       }
+
+       p2 := &pb.MicroServiceKey{ServiceName: "p2"}
+       FindInstancesCache.Set("d", "c2", p, r)
+       FindInstancesCache.Set("d", "c2", p2, r)
+       FindInstancesCache.Delete("d", "c", p)
+       c = FindInstancesCache.Get("d", "c2", p)
+       if c != nil {
+               t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+       }
+       c = FindInstancesCache.Get("d", "c2", p2)
+       if c == nil {
+               t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+       }
+       if c.Rev != 1 {
+               t.Fatalf("TestVersionRuleCache_Get failed, rev %d != 1", c.Rev)
+       }
+}
diff --git a/server/service/util/instance_util.go 
b/server/service/util/instance_util.go
index 6f3b3b3f..bf91107c 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -66,11 +66,12 @@ func GetInstance(ctx context.Context, domainProject string, 
serviceId string, in
        return instance, nil
 }
 
-func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids 
[]string) (instances []*pb.MicroServiceInstance, err error) {
+func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids 
[]string) (
+       instances []*pb.MicroServiceInstance, rev int64, err error) {
        cloneCtx := util.CloneContext(ctx)
        noCache, cacheOnly := ctx.Value(CTX_NOCACHE) == "1", 
ctx.Value(CTX_CACHEONLY) == "1"
 
-       rev, _ := cloneCtx.Value(CTX_REQUEST_REVISION).(int64)
+       rev, _ = cloneCtx.Value(CTX_REQUEST_REVISION).(int64)
        if !noCache && !cacheOnly && rev > 0 {
                // force to find in cache at first time when rev > 0
                util.SetContext(cloneCtx, CTX_CACHEONLY, "1")
@@ -86,7 +87,7 @@ func GetAllInstancesOfServices(ctx context.Context, 
domainProject string, ids []
                        opts := append(FromContext(cloneCtx), 
registry.WithStrKey(key), registry.WithPrefix())
                        resp, err := 
backend.Store().Instance().Search(cloneCtx, opts...)
                        if err != nil {
-                               return nil, err
+                               return nil, 0, err
                        }
 
                        if len(resp.Kvs) > 0 {
@@ -122,13 +123,13 @@ func GetAllInstancesOfServices(ctx context.Context, 
domainProject string, ids []
                instance := &pb.MicroServiceInstance{}
                err := json.Unmarshal(kv.Value, instance)
                if err != nil {
-                       return nil, fmt.Errorf("unmarshal %s faild, %s",
+                       return nil, 0, fmt.Errorf("unmarshal %s faild, %s",
                                util.BytesToStringWithNoCopy(kv.Key), 
err.Error())
                }
                instances = append(instances, instance)
        }
 
-       util.SetContext(ctx, CTX_RESPONSE_REVISION, max)
+       rev = max
        return
 }
 
diff --git a/server/service/util/instance_util_test.go 
b/server/service/util/instance_util_test.go
index 635be356..b99d810d 100644
--- a/server/service/util/instance_util_test.go
+++ b/server/service/util/instance_util_test.go
@@ -116,19 +116,19 @@ func TestGetInstanceCountOfOneService(t *testing.T) {
 }
 
 func TestGetInstanceCountOfServices(t *testing.T) {
-       _, err := 
GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_CACHEONLY, 
"1"), "", []string{"1"})
+       _, _, err := 
GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_CACHEONLY, 
"1"), "", []string{"1"})
        if err != nil {
                t.Fatalf(`GetAllInstancesOfServices CTX_CACHEONLY failed`)
        }
-       _, err = 
GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_NOCACHE, 
"1"), "", []string{"1"})
+       _, _, err = 
GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_NOCACHE, 
"1"), "", []string{"1"})
        if err == nil {
                t.Fatalf(`GetAllInstancesOfServices CTX_NOCACHE failed`)
        }
-       _, err = 
GetAllInstancesOfServices(util.SetContext(context.Background(), 
CTX_REQUEST_REVISION, 1), "", []string{"1"})
+       _, _, err = 
GetAllInstancesOfServices(util.SetContext(context.Background(), 
CTX_REQUEST_REVISION, 1), "", []string{"1"})
        if err == nil {
                t.Fatalf(`GetAllInstancesOfServices CTX_REQUEST_REVISION 
failed`)
        }
-       _, err = GetAllInstancesOfServices(context.Background(), "", 
[]string{"1"})
+       _, _, err = GetAllInstancesOfServices(context.Background(), "", 
[]string{"1"})
        if err == nil {
                t.Fatalf(`GetAllInstancesOfServices failed`)
        }
diff --git a/server/service/watch.go b/server/service/watch.go
index e0b57804..1fcefa03 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -45,7 +45,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, 
stream pb.ServiceIn
                return err
        }
        domainProject := util.ParseDomainProject(stream.Context())
-       watcher := nf.NewInstanceListWatcher(in.SelfServiceId, 
apt.GetInstanceRootKey(domainProject)+"/", nil)
+       watcher := nf.NewListWatcher(in.SelfServiceId, 
apt.GetInstanceRootKey(domainProject)+"/", nil)
        err = nf.GetNotifyService().AddSubscriber(watcher)
        util.Logger().Infof("start watch instance status, watcher %s %s", 
watcher.Subject(), watcher.Id())
        return nf.HandleWatchJob(watcher, stream, 
nf.GetNotifyService().Config.NotifyTimeout)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to