This is an automated email from the ASF dual-hosted git repository.

asifdxtreme pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 67e0f0b  SCB-680 Optimize find instance api (#376)
67e0f0b is described below

commit 67e0f0bd5b71e5d8034dc3ca84a13f88ba8692df
Author: little-cui <sure_0...@qq.com>
AuthorDate: Mon Jun 25 12:44:12 2018 +0800

    SCB-680 Optimize find instance api (#376)
    
    * SCB-680 Optimize find instance api
---
 server/service/dependency.go                       |  2 -
 server/service/event/instance_event_handler.go     | 19 +++++-
 server/service/event/rule_event_handler.go         |  2 +-
 server/service/event/tag_event_handler.go          |  2 +-
 server/service/instance.go                         | 26 +++++++-
 server/service/{util => notification}/common.go    | 41 +++++++++++--
 server/service/notification/listwatcher.go         | 25 +++++---
 .../{util/common.go => notification/notice.go}     | 32 +++++++---
 server/service/notification/stream.go              | 54 ++++++++++++++++
 .../notification/{struct.go => subscriber.go}      | 58 ------------------
 .../notification/{watch_util.go => websocket.go}   | 57 +----------------
 server/service/util/common.go                      |  4 ++
 server/service/util/find_cache.go                  | 71 ++++++++++++++++++++++
 server/service/util/find_cache_test.go             | 60 ++++++++++++++++++
 server/service/util/instance_util.go               | 11 ++--
 server/service/util/instance_util_test.go          |  8 +--
 server/service/watch.go                            |  2 +-
 17 files changed, 322 insertions(+), 152 deletions(-)

diff --git a/server/service/dependency.go b/server/service/dependency.go
index eac1aa2..4c13dd0 100644
--- a/server/service/dependency.go
+++ b/server/service/dependency.go
@@ -140,7 +140,6 @@ func (s *MicroServiceService) GetProviderDependencies(ctx 
context.Context, in *p
                        Response: pb.CreateResponse(scerr.ErrInternal, 
err.Error()),
                }, err
        }
-       util.Logger().Debugf("GetProviderDependencies successfully, providerId 
is %s.", in.ServiceId)
        return &pb.GetProDependenciesResponse{
                Response:  pb.CreateResponse(pb.Response_SUCCESS, "Get all 
consumers successful."),
                Consumers: services,
@@ -181,7 +180,6 @@ func (s *MicroServiceService) GetConsumerDependencies(ctx 
context.Context, in *p
                }, err
        }
 
-       util.Logger().Debugf("GetConsumerDependencies successfully, consumerId 
is %s.", consumerId)
        return &pb.GetConDependenciesResponse{
                Response:  pb.CreateResponse(pb.Response_SUCCESS, "Get all 
providers successfully."),
                Providers: services,
diff --git a/server/service/event/instance_event_handler.go 
b/server/service/event/instance_event_handler.go
index 8144649..a80376d 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -93,7 +93,7 @@ func (h *InstanceEventHandler) OnEvent(evt backend.KvEvent) {
                return
        }
 
-       nf.PublishInstanceEvent(domainProject, action, &pb.MicroServiceKey{
+       PublishInstanceEvent(domainProject, action, &pb.MicroServiceKey{
                Environment: ms.Environment,
                AppId:       ms.AppId,
                ServiceName: ms.ServiceName,
@@ -104,3 +104,20 @@ func (h *InstanceEventHandler) OnEvent(evt 
backend.KvEvent) {
 func NewInstanceEventHandler() *InstanceEventHandler {
        return &InstanceEventHandler{}
 }
+
+func PublishInstanceEvent(domainProject string, action pb.EventType, 
serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, 
subscribers []string) {
+       response := &pb.WatchInstanceResponse{
+               Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch 
instance successfully."),
+               Action:   string(action),
+               Key:      serviceKey,
+               Instance: instance,
+       }
+       for _, consumerId := range subscribers {
+               // expires cache
+               serviceUtil.FindInstancesCache.Delete(domainProject, 
consumerId, serviceKey)
+
+               // TODO add超时怎么处理?
+               job := nf.NewWatchJob(consumerId, 
apt.GetInstanceRootKey(domainProject)+"/", rev, response)
+               nf.GetNotifyService().AddJob(job)
+       }
+}
diff --git a/server/service/event/rule_event_handler.go 
b/server/service/event/rule_event_handler.go
index 0999dd0..474811e 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -69,7 +69,7 @@ func (apt *RulesChangedTask) publish(ctx context.Context, 
domainProject, provide
        }
        providerKey := pb.MicroServiceToKey(domainProject, provider)
 
-       nf.PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, 
rev, consumerIds)
+       PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, 
rev, consumerIds)
        return nil
 }
 
diff --git a/server/service/event/tag_event_handler.go 
b/server/service/event/tag_event_handler.go
index f87f037..a83d2c5 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -73,7 +73,7 @@ func (apt *TagsChangedTask) publish(ctx context.Context, 
domainProject, consumer
                        util.Logger().Warnf(err, "get service %s file failed", 
providerId)
                        continue
                }
-               nf.PublishInstanceEvent(domainProject, pb.EVT_EXPIRE,
+               PublishInstanceEvent(domainProject, pb.EVT_EXPIRE,
                        &pb.MicroServiceKey{
                                Environment: provider.Environment,
                                AppId:       provider.AppId,
diff --git a/server/service/instance.go b/server/service/instance.go
index bca599a..1618886 100644
--- a/server/service/instance.go
+++ b/server/service/instance.go
@@ -524,6 +524,7 @@ func (s *InstanceService) Find(ctx context.Context, in 
*pb.FindInstancesRequest)
                AppId:       in.AppId,
                ServiceName: in.ServiceName,
                Alias:       in.ServiceName,
+               Version:     in.VersionRule,
        }
        if apt.IsShared(provider) {
                // it means the shared micro-services must be the same env with 
SC.
@@ -536,6 +537,23 @@ func (s *InstanceService) Find(ctx context.Context, in 
*pb.FindInstancesRequest)
                provider.Tenant = util.ParseTargetDomainProject(ctx)
        }
 
+       // cache
+       if item := serviceUtil.FindInstancesCache.Get(provider.Tenant, 
in.ConsumerServiceId, provider); item != nil {
+               noCache, cacheOnly := ctx.Value(serviceUtil.CTX_NOCACHE) == 
"1", ctx.Value(serviceUtil.CTX_CACHEONLY) == "1"
+               rev, _ := ctx.Value(serviceUtil.CTX_REQUEST_REVISION).(int64)
+               if !noCache && (cacheOnly || rev <= item.Rev) {
+                       instances := item.Instances
+                       if rev == item.Rev {
+                               instances = instances[:0]
+                       }
+                       util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, 
item.Rev)
+                       return &pb.FindInstancesResponse{
+                               Response:  
pb.CreateResponse(pb.Response_SUCCESS, "Query service instances successfully."),
+                               Instances: instances,
+                       }, nil
+               }
+       }
+
        // 版本规则
        ids, err := serviceUtil.FindServiceIds(ctx, in.VersionRule, provider)
        if err != nil {
@@ -583,13 +601,19 @@ func (s *InstanceService) Find(ctx context.Context, in 
*pb.FindInstancesRequest)
                }
        }
 
-       instances, err := serviceUtil.GetAllInstancesOfServices(ctx, 
util.ParseTargetDomainProject(ctx), ids)
+       instances, rev, err := serviceUtil.GetAllInstancesOfServices(ctx, 
util.ParseTargetDomainProject(ctx), ids)
        if err != nil {
                util.Logger().Errorf(err, "find instance failed, %s: 
GetAllInstancesOfServices failed.", findFlag)
                return &pb.FindInstancesResponse{
                        Response: pb.CreateResponse(scerr.ErrInternal, 
err.Error()),
                }, err
        }
+
+       serviceUtil.FindInstancesCache.Set(provider.Tenant, 
in.ConsumerServiceId, provider, &serviceUtil.VersionRuleCacheItem{
+               Instances: instances,
+               Rev:       rev,
+       })
+       util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, rev)
        return &pb.FindInstancesResponse{
                Response:  pb.CreateResponse(pb.Response_SUCCESS, "Query 
service instances successfully."),
                Instances: instances,
diff --git a/server/service/util/common.go 
b/server/service/notification/common.go
similarity index 52%
copy from server/service/util/common.go
copy to server/service/notification/common.go
index 6c25e51..7e2e16a 100644
--- a/server/service/util/common.go
+++ b/server/service/notification/common.go
@@ -14,12 +14,41 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package util
+package notification
+
+import (
+       "fmt"
+       "strconv"
+       "time"
+)
 
 const (
-       HEADER_REV            = "X-Resource-Revision"
-       CTX_NOCACHE           = "noCache"
-       CTX_CACHEONLY         = "cacheOnly"
-       CTX_REQUEST_REVISION  = "requestRev"
-       CTX_RESPONSE_REVISION = "responseRev"
+       DEFAULT_MAX_QUEUE          = 1000
+       DEFAULT_INIT_SUBSCRIBERS   = 1000
+       DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
+       DEFAULT_TIMEOUT            = 30 * time.Second
+
+       NOTIFTY NotifyType = iota
+       INSTANCE
+       typeEnd
 )
+
+type NotifyType int
+
+func (nt NotifyType) String() string {
+       if int(nt) < len(notifyTypeNames) {
+               return notifyTypeNames[nt]
+       }
+       return "NotifyType" + strconv.Itoa(int(nt))
+}
+
+type NotifyServiceConfig struct {
+       AddTimeout    time.Duration
+       NotifyTimeout time.Duration
+       MaxQueue      int64
+}
+
+func (nsc NotifyServiceConfig) String() string {
+       return fmt.Sprintf("{acceptQueue: %d, accept: %s, notify: %s}",
+               nsc.MaxQueue, nsc.AddTimeout, nsc.NotifyTimeout)
+}
diff --git a/server/service/notification/listwatcher.go 
b/server/service/notification/listwatcher.go
index 5d911a2..a6942c9 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -32,7 +32,7 @@ type WatchJob struct {
 
 type ListWatcher struct {
        BaseSubscriber
-       Job          chan NotifyJob
+       Job          chan *WatchJob
        ListRevision int64
        ListFunc     func() (results []*pb.WatchInstanceResponse, rev int64)
 
@@ -56,7 +56,7 @@ func (w *ListWatcher) listAndPublishJobs(_ context.Context) {
        results, rev := w.ListFunc()
        w.ListRevision = rev
        for _, response := range results {
-               w.sendMessage(NewWatchJob(w.Type(), w.Id(), w.Subject(), 
w.ListRevision, response))
+               w.sendMessage(NewWatchJob(w.Id(), w.Subject(), w.ListRevision, 
response))
        }
 }
 
@@ -66,6 +66,11 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
                return
        }
 
+       wJob, ok := job.(*WatchJob)
+       if !ok {
+               return
+       }
+
        timer := time.NewTimer(DEFAULT_ON_MESSAGE_TIMEOUT)
        select {
        case <-w.listCh:
@@ -77,16 +82,16 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
                return
        }
 
-       if job.(*WatchJob).Revision <= w.ListRevision {
+       if wJob.Revision <= w.ListRevision {
                util.Logger().Warnf(nil,
                        "unexpected notify %s job is coming in, watcher %s %s, 
job is %v, current revision is %v",
                        w.Type(), w.Id(), w.Subject(), job, w.ListRevision)
                return
        }
-       w.sendMessage(job)
+       w.sendMessage(wJob)
 }
 
-func (w *ListWatcher) sendMessage(job NotifyJob) {
+func (w *ListWatcher) sendMessage(job *WatchJob) {
        util.Logger().Debugf("start to notify %s watcher %s %s, job is %v, 
current revision is %v", w.Type(),
                w.Id(), w.Subject(), job, w.ListRevision)
        defer util.RecoverAndReport()
@@ -105,27 +110,27 @@ func (w *ListWatcher) Close() {
        close(w.Job)
 }
 
-func NewWatchJob(nType NotifyType, subscriberId, subject string, rev int64, 
response *pb.WatchInstanceResponse) *WatchJob {
+func NewWatchJob(subscriberId, subject string, rev int64, response 
*pb.WatchInstanceResponse) *WatchJob {
        return &WatchJob{
                BaseNotifyJob: BaseNotifyJob{
                        subscriberId: subscriberId,
                        subject:      subject,
-                       nType:        nType,
+                       nType:        INSTANCE,
                },
                Revision: rev,
                Response: response,
        }
 }
 
-func NewListWatcher(nType NotifyType, id string, subject string,
+func NewListWatcher(id string, subject string,
        listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) 
*ListWatcher {
        watcher := &ListWatcher{
                BaseSubscriber: BaseSubscriber{
                        id:      id,
                        subject: subject,
-                       nType:   nType,
+                       nType:   INSTANCE,
                },
-               Job:      make(chan NotifyJob, DEFAULT_MAX_QUEUE),
+               Job:      make(chan *WatchJob, DEFAULT_MAX_QUEUE),
                ListFunc: listFunc,
                listCh:   make(chan struct{}),
        }
diff --git a/server/service/util/common.go 
b/server/service/notification/notice.go
similarity index 66%
copy from server/service/util/common.go
copy to server/service/notification/notice.go
index 6c25e51..498d435 100644
--- a/server/service/util/common.go
+++ b/server/service/notification/notice.go
@@ -14,12 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package util
+package notification
 
-const (
-       HEADER_REV            = "X-Resource-Revision"
-       CTX_NOCACHE           = "noCache"
-       CTX_CACHEONLY         = "cacheOnly"
-       CTX_REQUEST_REVISION  = "requestRev"
-       CTX_RESPONSE_REVISION = "responseRev"
-)
+type NotifyJob interface {
+       SubscriberId() string
+       Subject() string
+       Type() NotifyType
+}
+
+type BaseNotifyJob struct {
+       subscriberId string
+       subject      string
+       nType        NotifyType
+}
+
+func (s *BaseNotifyJob) SubscriberId() string {
+       return s.subscriberId
+}
+
+func (s *BaseNotifyJob) Subject() string {
+       return s.subject
+}
+
+func (s *BaseNotifyJob) Type() NotifyType {
+       return s.nType
+}
diff --git a/server/service/notification/stream.go 
b/server/service/notification/stream.go
new file mode 100644
index 0000000..101473d
--- /dev/null
+++ b/server/service/notification/stream.go
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package notification
+
+import (
+       "errors"
+       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       "time"
+)
+
+func HandleWatchJob(watcher *ListWatcher, stream 
pb.ServiceInstanceCtrl_WatchServer, timeout time.Duration) (err error) {
+       for {
+               timer := time.NewTimer(timeout)
+               select {
+               case <-timer.C:
+                       // TODO grpc 长连接心跳?
+               case job := <-watcher.Job:
+                       timer.Stop()
+
+                       if job == nil {
+                               err = errors.New("channel is closed")
+                               util.Logger().Errorf(err, "watcher %s %s caught 
an exception",
+                                       watcher.Subject(), watcher.Id())
+                               return
+                       }
+                       resp := job.Response
+                       util.Logger().Infof("event is coming in, watcher %s %s",
+                               watcher.Subject(), watcher.Id())
+
+                       err = stream.Send(resp)
+                       if err != nil {
+                               util.Logger().Errorf(err, "send message error, 
watcher %s %s",
+                                       watcher.Subject(), watcher.Id())
+                               watcher.SetError(err)
+                               return
+                       }
+               }
+       }
+}
diff --git a/server/service/notification/struct.go 
b/server/service/notification/subscriber.go
similarity index 64%
rename from server/service/notification/struct.go
rename to server/service/notification/subscriber.go
index f6564ff..9f0b4df 100644
--- a/server/service/notification/struct.go
+++ b/server/service/notification/subscriber.go
@@ -18,42 +18,8 @@ package notification
 
 import (
        "errors"
-       "fmt"
-       "strconv"
-       "time"
 )
 
-const (
-       DEFAULT_MAX_QUEUE          = 1000
-       DEFAULT_INIT_SUBSCRIBERS   = 1000
-       DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
-       DEFAULT_TIMEOUT            = 30 * time.Second
-
-       NOTIFTY NotifyType = iota
-       INSTANCE
-       typeEnd
-)
-
-type NotifyType int
-
-func (nt NotifyType) String() string {
-       if int(nt) < len(notifyTypeNames) {
-               return notifyTypeNames[nt]
-       }
-       return "NotifyType" + strconv.Itoa(int(nt))
-}
-
-type NotifyServiceConfig struct {
-       AddTimeout    time.Duration
-       NotifyTimeout time.Duration
-       MaxQueue      int64
-}
-
-func (nsc NotifyServiceConfig) String() string {
-       return fmt.Sprintf("{acceptQueue: %d, accept: %s, notify: %s}",
-               nsc.MaxQueue, nsc.AddTimeout, nsc.NotifyTimeout)
-}
-
 type Subscriber interface {
        Err() error
        SetError(err error)
@@ -68,12 +34,6 @@ type Subscriber interface {
        Close()
 }
 
-type NotifyJob interface {
-       SubscriberId() string
-       Subject() string
-       Type() NotifyType
-}
-
 type BaseSubscriber struct {
        id      string
        subject string
@@ -122,21 +82,3 @@ func (s *BaseSubscriber) OnMessage(job NotifyJob) {
 func (s *BaseSubscriber) Close() {
 
 }
-
-type BaseNotifyJob struct {
-       subscriberId string
-       subject      string
-       nType        NotifyType
-}
-
-func (s *BaseNotifyJob) SubscriberId() string {
-       return s.subscriberId
-}
-
-func (s *BaseNotifyJob) Subject() string {
-       return s.subject
-}
-
-func (s *BaseNotifyJob) Type() NotifyType {
-       return s.nType
-}
diff --git a/server/service/notification/watch_util.go 
b/server/service/notification/websocket.go
similarity index 82%
rename from server/service/notification/watch_util.go
rename to server/service/notification/websocket.go
index 3a13500..f53e641 100644
--- a/server/service/notification/watch_util.go
+++ b/server/service/notification/websocket.go
@@ -18,7 +18,6 @@ package notification
 
 import (
        "encoding/json"
-       "errors"
        "fmt"
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        apt "github.com/apache/incubator-servicecomb-service-center/server/core"
@@ -29,36 +28,6 @@ import (
        "time"
 )
 
-func HandleWatchJob(watcher *ListWatcher, stream 
pb.ServiceInstanceCtrl_WatchServer, timeout time.Duration) (err error) {
-       for {
-               timer := time.NewTimer(timeout)
-               select {
-               case <-timer.C:
-               // TODO grpc 长连接心跳?
-               case job := <-watcher.Job:
-                       timer.Stop()
-
-                       if job == nil {
-                               err = errors.New("channel is closed")
-                               util.Logger().Errorf(err, "watcher %s %s caught 
an exception",
-                                       watcher.Subject(), watcher.Id())
-                               return
-                       }
-                       resp := job.(*WatchJob).Response
-                       util.Logger().Infof("event is coming in, watcher %s %s",
-                               watcher.Subject(), watcher.Id())
-
-                       err = stream.Send(resp)
-                       if err != nil {
-                               util.Logger().Errorf(err, "send message error, 
watcher %s %s",
-                                       watcher.Subject(), watcher.Id())
-                               watcher.SetError(err)
-                               return
-                       }
-               }
-       }
-}
-
 type WebSocketHandler struct {
        ctx             context.Context
        conn            *websocket.Conn
@@ -71,7 +40,7 @@ type WebSocketHandler struct {
 func (wh *WebSocketHandler) Init() error {
        remoteAddr := wh.conn.RemoteAddr().String()
        if err := GetNotifyService().AddSubscriber(wh.watcher); err != nil {
-               err = fmt.Errorf("establish[%s] websocket watch failed: notify 
service error, %s.",
+               err = fmt.Errorf("establish[%s] websocket watch failed: notify 
service error, %s",
                        remoteAddr, err.Error())
                util.Logger().Errorf(nil, err.Error())
 
@@ -207,7 +176,7 @@ func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
                                return
                        }
 
-                       resp := job.(*WatchJob).Response
+                       resp := job.Response
 
                        providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, 
resp.Key.ServiceName, resp.Key.Version)
                        if resp.Action != string(pb.EVT_EXPIRE) {
@@ -261,7 +230,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId 
string, f func() ([]
        handler := &WebSocketHandler{
                ctx:             ctx,
                conn:            conn,
-               watcher:         NewInstanceListWatcher(serviceId, 
apt.GetInstanceRootKey(domainProject)+"/", f),
+               watcher:         NewListWatcher(serviceId, 
apt.GetInstanceRootKey(domainProject)+"/", f),
                needPingWatcher: true,
                closed:          make(chan struct{}),
                goroutine:       util.NewGo(context.Background()),
@@ -283,23 +252,3 @@ func EstablishWebSocketError(conn *websocket.Conn, err 
error) {
                util.Logger().Errorf(err, "establish[%s] websocket watch 
failed: write message failed.", remoteAddr)
        }
 }
-
-func PublishInstanceEvent(domainProject string, action pb.EventType, 
serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, 
subscribers []string) {
-       response := &pb.WatchInstanceResponse{
-               Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch 
instance successfully."),
-               Action:   string(action),
-               Key:      serviceKey,
-               Instance: instance,
-       }
-       for _, consumerId := range subscribers {
-               job := NewWatchJob(INSTANCE, consumerId, 
apt.GetInstanceRootKey(domainProject)+"/", rev, response)
-               util.Logger().Debugf("publish event to notify service, %v", job)
-
-               // TODO add超时怎么处理?
-               GetNotifyService().AddJob(job)
-       }
-}
-
-func NewInstanceListWatcher(selfServiceId, instanceRoot string, listFunc 
func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
-       return NewListWatcher(INSTANCE, selfServiceId, instanceRoot, listFunc)
-}
diff --git a/server/service/util/common.go b/server/service/util/common.go
index 6c25e51..9e31b3f 100644
--- a/server/service/util/common.go
+++ b/server/service/util/common.go
@@ -16,10 +16,14 @@
  */
 package util
 
+import "time"
+
 const (
        HEADER_REV            = "X-Resource-Revision"
        CTX_NOCACHE           = "noCache"
        CTX_CACHEONLY         = "cacheOnly"
        CTX_REQUEST_REVISION  = "requestRev"
        CTX_RESPONSE_REVISION = "responseRev"
+
+       cacheTTL = 5 * time.Minute
 )
diff --git a/server/service/util/find_cache.go 
b/server/service/util/find_cache.go
new file mode 100644
index 0000000..d5c2ab7
--- /dev/null
+++ b/server/service/util/find_cache.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+       "errors"
+       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       "github.com/karlseguin/ccache"
+)
+
+var FindInstancesCache = &VersionRuleCache{
+       c: ccache.Layered(ccache.Configure()),
+}
+
+type VersionRuleCacheItem struct {
+       Instances []*pb.MicroServiceInstance
+       Rev       int64
+}
+
+type VersionRuleCache struct {
+       c *ccache.LayeredCache
+}
+
+func (c *VersionRuleCache) primaryKey(domainProject string, provider 
*pb.MicroServiceKey) string {
+       return util.StringJoin([]string{
+               domainProject,
+               provider.Environment,
+               provider.AppId,
+               provider.ServiceName}, "/")
+}
+
+func (c *VersionRuleCache) Get(domainProject, consumer string, provider 
*pb.MicroServiceKey) *VersionRuleCacheItem {
+       item, _ := c.c.Fetch(c.primaryKey(domainProject, provider), 
provider.Version, cacheTTL, func() (interface{}, error) {
+               return nil, errors.New("not exist")
+       })
+       if item == nil || item.Expired() {
+               return nil
+       }
+
+       if v, ok := item.Value().(*util.ConcurrentMap).Get(consumer); ok {
+               return v.(*VersionRuleCacheItem)
+       }
+       return nil
+}
+
+func (c *VersionRuleCache) Set(domainProject, consumer string, provider 
*pb.MicroServiceKey, item *VersionRuleCacheItem) {
+       c2, _ := c.c.Fetch(c.primaryKey(domainProject, provider), 
provider.Version, cacheTTL, func() (interface{}, error) {
+               // new one if not exist
+               return util.NewConcurrentMap(1), nil
+       })
+       c2.Value().(*util.ConcurrentMap).Put(consumer, item)
+}
+
+func (c *VersionRuleCache) Delete(domainProject, consumer string, provider 
*pb.MicroServiceKey) {
+       c.c.DeleteAll(c.primaryKey(domainProject, provider))
+}
diff --git a/server/service/util/find_cache_test.go 
b/server/service/util/find_cache_test.go
new file mode 100644
index 0000000..9275101
--- /dev/null
+++ b/server/service/util/find_cache_test.go
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+       pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       "testing"
+)
+
+func TestVersionRuleCache_Get(t *testing.T) {
+       p := &pb.MicroServiceKey{}
+       c := FindInstancesCache.Get("d", "c", p)
+       if c != nil {
+               t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+       }
+
+       r := &VersionRuleCacheItem{Rev: 1}
+       FindInstancesCache.Set("d", "c", p, r)
+       c = FindInstancesCache.Get("d", "c", p)
+       if c == nil {
+               t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+       }
+       if c.Rev != 1 {
+               t.Fatalf("TestVersionRuleCache_Get failed, rev %d != 1", c.Rev)
+       }
+       c = FindInstancesCache.Get("d", "c2", p)
+       if c != nil {
+               t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+       }
+
+       p2 := &pb.MicroServiceKey{ServiceName: "p2"}
+       FindInstancesCache.Set("d", "c2", p, r)
+       FindInstancesCache.Set("d", "c2", p2, r)
+       FindInstancesCache.Delete("d", "c", p)
+       c = FindInstancesCache.Get("d", "c2", p)
+       if c != nil {
+               t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+       }
+       c = FindInstancesCache.Get("d", "c2", p2)
+       if c == nil {
+               t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+       }
+       if c.Rev != 1 {
+               t.Fatalf("TestVersionRuleCache_Get failed, rev %d != 1", c.Rev)
+       }
+}
diff --git a/server/service/util/instance_util.go 
b/server/service/util/instance_util.go
index 6f3b3b3..bf91107 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -66,11 +66,12 @@ func GetInstance(ctx context.Context, domainProject string, 
serviceId string, in
        return instance, nil
 }
 
-func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids 
[]string) (instances []*pb.MicroServiceInstance, err error) {
+func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids 
[]string) (
+       instances []*pb.MicroServiceInstance, rev int64, err error) {
        cloneCtx := util.CloneContext(ctx)
        noCache, cacheOnly := ctx.Value(CTX_NOCACHE) == "1", 
ctx.Value(CTX_CACHEONLY) == "1"
 
-       rev, _ := cloneCtx.Value(CTX_REQUEST_REVISION).(int64)
+       rev, _ = cloneCtx.Value(CTX_REQUEST_REVISION).(int64)
        if !noCache && !cacheOnly && rev > 0 {
                // force to find in cache at first time when rev > 0
                util.SetContext(cloneCtx, CTX_CACHEONLY, "1")
@@ -86,7 +87,7 @@ func GetAllInstancesOfServices(ctx context.Context, 
domainProject string, ids []
                        opts := append(FromContext(cloneCtx), 
registry.WithStrKey(key), registry.WithPrefix())
                        resp, err := 
backend.Store().Instance().Search(cloneCtx, opts...)
                        if err != nil {
-                               return nil, err
+                               return nil, 0, err
                        }
 
                        if len(resp.Kvs) > 0 {
@@ -122,13 +123,13 @@ func GetAllInstancesOfServices(ctx context.Context, 
domainProject string, ids []
                instance := &pb.MicroServiceInstance{}
                err := json.Unmarshal(kv.Value, instance)
                if err != nil {
-                       return nil, fmt.Errorf("unmarshal %s faild, %s",
+                       return nil, 0, fmt.Errorf("unmarshal %s faild, %s",
                                util.BytesToStringWithNoCopy(kv.Key), 
err.Error())
                }
                instances = append(instances, instance)
        }
 
-       util.SetContext(ctx, CTX_RESPONSE_REVISION, max)
+       rev = max
        return
 }
 
diff --git a/server/service/util/instance_util_test.go 
b/server/service/util/instance_util_test.go
index 635be35..b99d810 100644
--- a/server/service/util/instance_util_test.go
+++ b/server/service/util/instance_util_test.go
@@ -116,19 +116,19 @@ func TestGetInstanceCountOfOneService(t *testing.T) {
 }
 
 func TestGetInstanceCountOfServices(t *testing.T) {
-       _, err := 
GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_CACHEONLY, 
"1"), "", []string{"1"})
+       _, _, err := 
GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_CACHEONLY, 
"1"), "", []string{"1"})
        if err != nil {
                t.Fatalf(`GetAllInstancesOfServices CTX_CACHEONLY failed`)
        }
-       _, err = 
GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_NOCACHE, 
"1"), "", []string{"1"})
+       _, _, err = 
GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_NOCACHE, 
"1"), "", []string{"1"})
        if err == nil {
                t.Fatalf(`GetAllInstancesOfServices CTX_NOCACHE failed`)
        }
-       _, err = 
GetAllInstancesOfServices(util.SetContext(context.Background(), 
CTX_REQUEST_REVISION, 1), "", []string{"1"})
+       _, _, err = 
GetAllInstancesOfServices(util.SetContext(context.Background(), 
CTX_REQUEST_REVISION, 1), "", []string{"1"})
        if err == nil {
                t.Fatalf(`GetAllInstancesOfServices CTX_REQUEST_REVISION 
failed`)
        }
-       _, err = GetAllInstancesOfServices(context.Background(), "", 
[]string{"1"})
+       _, _, err = GetAllInstancesOfServices(context.Background(), "", 
[]string{"1"})
        if err == nil {
                t.Fatalf(`GetAllInstancesOfServices failed`)
        }
diff --git a/server/service/watch.go b/server/service/watch.go
index e0b5780..1fcefa0 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -45,7 +45,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, 
stream pb.ServiceIn
                return err
        }
        domainProject := util.ParseDomainProject(stream.Context())
-       watcher := nf.NewInstanceListWatcher(in.SelfServiceId, 
apt.GetInstanceRootKey(domainProject)+"/", nil)
+       watcher := nf.NewListWatcher(in.SelfServiceId, 
apt.GetInstanceRootKey(domainProject)+"/", nil)
        err = nf.GetNotifyService().AddSubscriber(watcher)
        util.Logger().Infof("start watch instance status, watcher %s %s", 
watcher.Subject(), watcher.Id())
        return nf.HandleWatchJob(watcher, stream, 
nf.GetNotifyService().Config.NotifyTimeout)

Reply via email to