This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 5782435  SCB-1059 Optimize quota plugin (#529)
5782435 is described below

commit 5782435c1dd45a712d02abfaba5fdb2ec4100b97
Author: little-cui <[email protected]>
AuthorDate: Tue Jan 29 09:48:27 2019 +0800

    SCB-1059 Optimize quota plugin (#529)
---
 server/plugin/pkg/quota/buildin/buildin.go      |   2 +
 server/plugin/pkg/quota/buildin/common.go       |   6 +-
 server/plugin/pkg/quota/buildin/counter.go      |  57 ++++++
 server/plugin/pkg/quota/buildin/counter_test.go |  50 +++++
 server/plugin/pkg/quota/counter/counter.go      |  49 +++++
 server/plugin/pkg/quota/counter/event.go        | 105 ++++++++++
 server/plugin/pkg/quota/counter/event_test.go   | 260 ++++++++++++++++++++++++
 server/service/event/instance_event_handler.go  |  16 +-
 server/service/util/instance_util.go            |  11 +-
 9 files changed, 544 insertions(+), 12 deletions(-)

diff --git a/server/plugin/pkg/quota/buildin/buildin.go 
b/server/plugin/pkg/quota/buildin/buildin.go
index 06ef630..6d5f147 100644
--- a/server/plugin/pkg/quota/buildin/buildin.go
+++ b/server/plugin/pkg/quota/buildin/buildin.go
@@ -20,11 +20,13 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/log"
        mgr "github.com/apache/servicecomb-service-center/server/plugin"
        "github.com/apache/servicecomb-service-center/server/plugin/pkg/quota"
+       
"github.com/apache/servicecomb-service-center/server/plugin/pkg/quota/counter"
        "golang.org/x/net/context"
 )
 
 func init() {
        mgr.RegisterPlugin(mgr.Plugin{mgr.QUOTA, "buildin", New})
+       counter.RegisterCounterListener("buildin")
 }
 
 func New() mgr.PluginInstance {
diff --git a/server/plugin/pkg/quota/buildin/common.go 
b/server/plugin/pkg/quota/buildin/common.go
index eba7eb5..6e47136 100644
--- a/server/plugin/pkg/quota/buildin/common.go
+++ b/server/plugin/pkg/quota/buildin/common.go
@@ -85,11 +85,9 @@ func resourceLimitHandler(ctx context.Context, res 
*quota.ApplyQuotaResource) (i
 
        switch res.QuotaType {
        case quota.MicroServiceInstanceQuotaType:
-               key = core.GetInstanceRootKey("")
-               indexer = backend.Store().Instance()
+               return globalCounter.InstanceCount, nil
        case quota.MicroServiceQuotaType:
-               key = core.GetServiceRootKey("")
-               indexer = backend.Store().Service()
+               return globalCounter.ServiceCount, nil
        case quota.RuleQuotaType:
                key = core.GenerateServiceRuleKey(domainProject, serviceId, "")
                indexer = backend.Store().Rule()
diff --git a/server/plugin/pkg/quota/buildin/counter.go 
b/server/plugin/pkg/quota/buildin/counter.go
new file mode 100644
index 0000000..86eb066
--- /dev/null
+++ b/server/plugin/pkg/quota/buildin/counter.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buildin
+
+import (
+       "github.com/apache/servicecomb-service-center/server/core/backend"
+       
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+       
"github.com/apache/servicecomb-service-center/server/plugin/pkg/quota/counter"
+)
+
+var globalCounter = &GlobalCounter{}
+
+func init() {
+       counter.RegisterCounter(globalCounter)
+}
+
+type GlobalCounter struct {
+       ServiceCount  int64
+       InstanceCount int64
+}
+
+func (c *GlobalCounter) OnCreate(t discovery.Type, domainProject string) {
+       switch t {
+       case backend.SERVICE_INDEX:
+               c.ServiceCount++
+       case backend.INSTANCE:
+               c.InstanceCount++
+       }
+}
+
+func (c *GlobalCounter) OnDelete(t discovery.Type, domainProject string) {
+       switch t {
+       case backend.SERVICE_INDEX:
+               if c.ServiceCount == 0 {
+                       return
+               }
+               c.ServiceCount--
+       case backend.INSTANCE:
+               if c.InstanceCount == 0 {
+                       return
+               }
+               c.InstanceCount--
+       }
+}
diff --git a/server/plugin/pkg/quota/buildin/counter_test.go 
b/server/plugin/pkg/quota/buildin/counter_test.go
new file mode 100644
index 0000000..a8591cf
--- /dev/null
+++ b/server/plugin/pkg/quota/buildin/counter_test.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buildin
+
+import (
+       "github.com/apache/servicecomb-service-center/server/core/backend"
+       "testing"
+)
+
+func TestGlobalCounter_OnCreate(t *testing.T) {
+       var counter GlobalCounter
+       counter.OnCreate(backend.SERVICE, "a/b")
+       counter.OnCreate(backend.SERVICE_INDEX, "a/b")
+       counter.OnCreate(backend.INSTANCE, "a/b")
+       counter.OnCreate(backend.SERVICE_INDEX, "a/b")
+       counter.OnCreate(backend.INSTANCE, "a/b")
+       if counter.ServiceCount != 2 || counter.InstanceCount != 2 {
+               t.Fatal("TestGlobalCounter_OnCreate failed", counter)
+       }
+}
+
+func TestGlobalCounter_OnDelete(t *testing.T) {
+       var counter GlobalCounter
+       counter.OnDelete(backend.SERVICE, "a/b")
+       counter.OnDelete(backend.SERVICE_INDEX, "a/b")
+       counter.OnDelete(backend.INSTANCE, "a/b")
+       if counter.ServiceCount != 0 || counter.InstanceCount != 0 {
+               t.Fatal("TestGlobalCounter_OnDelete failed", counter)
+       }
+       counter.OnCreate(backend.SERVICE_INDEX, "a/b")
+       counter.OnCreate(backend.INSTANCE, "a/b")
+       counter.OnDelete(backend.SERVICE_INDEX, "a/b")
+       counter.OnDelete(backend.INSTANCE, "a/b")
+       if counter.ServiceCount != 0 || counter.InstanceCount != 0 {
+               t.Fatal("TestGlobalCounter_OnDelete failed", counter)
+       }
+}
diff --git a/server/plugin/pkg/quota/counter/counter.go 
b/server/plugin/pkg/quota/counter/counter.go
new file mode 100644
index 0000000..6aa7a63
--- /dev/null
+++ b/server/plugin/pkg/quota/counter/counter.go
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package counter
+
+import (
+       
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+)
+
+var counters = Counters{}
+
+type Counter interface {
+       OnCreate(t discovery.Type, domainProject string)
+       OnDelete(t discovery.Type, domainProject string)
+}
+
+type Counters []Counter
+
+func (cs Counters) OnCreate(t discovery.Type, domainProject string) {
+       for _, c := range cs {
+               c.OnCreate(t, domainProject)
+       }
+}
+
+func (cs Counters) OnDelete(t discovery.Type, domainProject string) {
+       for _, c := range cs {
+               c.OnDelete(t, domainProject)
+       }
+}
+
+func RegisterCounter(c Counter) {
+       counters = append(counters, c)
+}
+
+func GetCounters() Counters {
+       return counters
+}
diff --git a/server/plugin/pkg/quota/counter/event.go 
b/server/plugin/pkg/quota/counter/event.go
new file mode 100644
index 0000000..3dcf38a
--- /dev/null
+++ b/server/plugin/pkg/quota/counter/event.go
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package counter
+
+import (
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/core"
+       "github.com/apache/servicecomb-service-center/server/core/backend"
+       pb "github.com/apache/servicecomb-service-center/server/core/proto"
+       
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+       serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
+       "github.com/astaxie/beego"
+       "golang.org/x/net/context"
+)
+
+var (
+       SharedServiceIds util.ConcurrentMap
+)
+
+type ServiceIndexEventHandler struct {
+}
+
+func (h *ServiceIndexEventHandler) Type() discovery.Type {
+       return backend.SERVICE_INDEX
+}
+
+func (h *ServiceIndexEventHandler) OnEvent(evt discovery.KvEvent) {
+       key := core.GetInfoFromSvcIndexKV(evt.KV.Key)
+       if core.IsShared(key) {
+               
SharedServiceIds.Put(key.Tenant+core.SPLIT+evt.KV.Value.(string), struct{}{})
+               return
+       }
+
+       switch evt.Type {
+       case pb.EVT_INIT, pb.EVT_CREATE:
+               GetCounters().OnCreate(h.Type(), key.Tenant)
+       case pb.EVT_DELETE:
+               GetCounters().OnDelete(h.Type(), key.Tenant)
+       default:
+       }
+}
+
+func NewServiceIndexEventHandler() *ServiceIndexEventHandler {
+       return &ServiceIndexEventHandler{}
+}
+
+type InstanceEventHandler struct {
+       SharedServiceIds map[string]struct{}
+}
+
+func (h *InstanceEventHandler) Type() discovery.Type {
+       return backend.INSTANCE
+}
+
+func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
+       serviceId, _, domainProject := core.GetInfoFromInstKV(evt.KV.Key)
+       key := domainProject + core.SPLIT + serviceId
+       if _, ok := SharedServiceIds.Get(key); ok {
+               return
+       }
+
+       switch evt.Type {
+       case pb.EVT_INIT, pb.EVT_CREATE:
+               if domainProject == core.REGISTRY_DOMAIN_PROJECT {
+                       service, err := 
serviceUtil.GetService(context.Background(), domainProject, serviceId)
+                       if service == nil || err != nil {
+                               log.Errorf(err, "GetService[%s] failed", key)
+                               return
+                       }
+                       if core.IsShared(pb.MicroServiceToKey(domainProject, 
service)) {
+                               SharedServiceIds.Put(key, struct{}{})
+                               return
+                       }
+               }
+               GetCounters().OnCreate(h.Type(), domainProject)
+       case pb.EVT_DELETE:
+               GetCounters().OnDelete(h.Type(), domainProject)
+       }
+}
+
+func NewInstanceEventHandler() *InstanceEventHandler {
+       return &InstanceEventHandler{SharedServiceIds: 
make(map[string]struct{})}
+}
+
+func RegisterCounterListener(pluginName string) {
+       if pluginName != beego.AppConfig.DefaultString("quota_plugin", 
"buildin") {
+               return
+       }
+       discovery.AddEventHandler(NewServiceIndexEventHandler())
+       discovery.AddEventHandler(NewInstanceEventHandler())
+}
diff --git a/server/plugin/pkg/quota/counter/event_test.go 
b/server/plugin/pkg/quota/counter/event_test.go
new file mode 100644
index 0000000..9254b8e
--- /dev/null
+++ b/server/plugin/pkg/quota/counter/event_test.go
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package counter
+
+import (
+       "github.com/apache/servicecomb-service-center/server/core"
+       "github.com/apache/servicecomb-service-center/server/core/backend"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+       "testing"
+)
+
+type mockCounter struct {
+       ServiceCount  int64
+       InstanceCount int64
+}
+
+func (c *mockCounter) OnCreate(t discovery.Type, domainProject string) {
+       switch t {
+       case backend.SERVICE_INDEX:
+               c.ServiceCount++
+       case backend.INSTANCE:
+               c.InstanceCount++
+       default:
+               panic("error")
+       }
+}
+
+func (c *mockCounter) OnDelete(t discovery.Type, domainProject string) {
+       switch t {
+       case backend.SERVICE_INDEX:
+               c.ServiceCount--
+       case backend.INSTANCE:
+               c.InstanceCount--
+       default:
+               panic("error")
+       }
+}
+
+func TestNewServiceIndexEventHandler(t *testing.T) {
+
+       var counter = mockCounter{}
+       RegisterCounter(&counter)
+       h := NewServiceIndexEventHandler()
+
+       cases := []discovery.KvEvent{
+               {
+                       Type: proto.EVT_INIT,
+                       KV: &discovery.KeyValue{
+                               Key: 
[]byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+                                       Tenant:      
core.REGISTRY_DOMAIN_PROJECT,
+                                       Project:     "",
+                                       AppId:       core.REGISTRY_APP_ID,
+                                       ServiceName: core.REGISTRY_SERVICE_NAME,
+                                       Version:     "e",
+                                       Environment: "f",
+                                       Alias:       "g",
+                               })),
+                               Value: "1",
+                       },
+               },
+               {
+                       Type: proto.EVT_UPDATE,
+                       KV: &discovery.KeyValue{
+                               Key: 
[]byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+                                       Tenant:      
core.REGISTRY_DOMAIN_PROJECT,
+                                       Project:     "",
+                                       AppId:       core.REGISTRY_APP_ID,
+                                       ServiceName: core.REGISTRY_SERVICE_NAME,
+                                       Version:     "e",
+                                       Environment: "f",
+                                       Alias:       "g",
+                               })),
+                               Value: "1",
+                       },
+               },
+               {
+                       Type: proto.EVT_DELETE,
+                       KV: &discovery.KeyValue{
+                               Key: 
[]byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+                                       Tenant:      
core.REGISTRY_DOMAIN_PROJECT,
+                                       Project:     "",
+                                       AppId:       core.REGISTRY_APP_ID,
+                                       ServiceName: core.REGISTRY_SERVICE_NAME,
+                                       Version:     "e",
+                                       Environment: "f",
+                                       Alias:       "g",
+                               })),
+                               Value: "1",
+                       },
+               },
+               {
+                       Type: proto.EVT_CREATE,
+                       KV: &discovery.KeyValue{
+                               Key: 
[]byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+                                       Tenant:      
core.REGISTRY_DOMAIN_PROJECT,
+                                       Project:     "",
+                                       AppId:       core.REGISTRY_APP_ID,
+                                       ServiceName: core.REGISTRY_SERVICE_NAME,
+                                       Version:     "e",
+                                       Environment: "f",
+                                       Alias:       "g",
+                               })),
+                               Value: "1",
+                       },
+               },
+               {
+                       Type: proto.EVT_INIT,
+                       KV: &discovery.KeyValue{
+                               Key: 
[]byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+                                       Tenant:      "a/b",
+                                       Project:     "",
+                                       AppId:       "c",
+                                       ServiceName: "d",
+                                       Version:     "e",
+                                       Environment: "f",
+                                       Alias:       "g",
+                               })),
+                               Value: "1",
+                       },
+               },
+               {
+                       Type: proto.EVT_DELETE,
+                       KV: &discovery.KeyValue{
+                               Key: 
[]byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+                                       Tenant:      "a/b",
+                                       Project:     "",
+                                       AppId:       "c",
+                                       ServiceName: "d",
+                                       Version:     "e",
+                                       Environment: "f",
+                                       Alias:       "g",
+                               })),
+                               Value: "1",
+                       },
+               },
+               {
+                       Type: proto.EVT_UPDATE,
+                       KV: &discovery.KeyValue{
+                               Key: 
[]byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+                                       Tenant:      "a/b",
+                                       Project:     "",
+                                       AppId:       "c",
+                                       ServiceName: "d",
+                                       Version:     "e",
+                                       Environment: "f",
+                                       Alias:       "g",
+                               })),
+                               Value: "1",
+                       },
+               },
+               {
+                       Type: proto.EVT_CREATE,
+                       KV: &discovery.KeyValue{
+                               Key: 
[]byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+                                       Tenant:      "a/b",
+                                       Project:     "",
+                                       AppId:       "c",
+                                       ServiceName: "d",
+                                       Version:     "e",
+                                       Environment: "f",
+                                       Alias:       "g",
+                               })),
+                               Value: "1",
+                       },
+               },
+       }
+
+       for _, evt := range cases {
+               h.OnEvent(evt)
+       }
+       if counter.ServiceCount != 1 || counter.InstanceCount != 0 {
+               t.Fatal("TestNewServiceIndexEventHandler failed", counter)
+       }
+}
+
+func TestNewInstanceEventHandler(t *testing.T) {
+       var counter = mockCounter{}
+       RegisterCounter(&counter)
+       h := NewInstanceEventHandler()
+       SharedServiceIds.Put(core.REGISTRY_DOMAIN_PROJECT+core.SPLIT+"2", 
struct{}{})
+       cases := []discovery.KvEvent{
+               {
+                       Type: proto.EVT_INIT,
+                       KV: &discovery.KeyValue{
+                               Key:   
[]byte(core.GenerateInstanceKey(core.REGISTRY_DOMAIN_PROJECT, "2", "1")),
+                               Value: nil,
+                       },
+               },
+               {
+                       Type: proto.EVT_UPDATE,
+                       KV: &discovery.KeyValue{
+                               Key:   
[]byte(core.GenerateInstanceKey(core.REGISTRY_DOMAIN_PROJECT, "2", "1")),
+                               Value: nil,
+                       },
+               },
+               {
+                       Type: proto.EVT_CREATE,
+                       KV: &discovery.KeyValue{
+                               Key:   
[]byte(core.GenerateInstanceKey(core.REGISTRY_DOMAIN_PROJECT, "2", "1")),
+                               Value: nil,
+                       },
+               },
+               {
+                       Type: proto.EVT_DELETE,
+                       KV: &discovery.KeyValue{
+                               Key:   
[]byte(core.GenerateInstanceKey(core.REGISTRY_DOMAIN_PROJECT, "2", "1")),
+                               Value: nil,
+                       },
+               },
+               {
+                       Type: proto.EVT_INIT,
+                       KV: &discovery.KeyValue{
+                               Key:   []byte(core.GenerateInstanceKey("a/b", 
"1", "1")),
+                               Value: nil,
+                       },
+               },
+               {
+                       Type: proto.EVT_DELETE,
+                       KV: &discovery.KeyValue{
+                               Key:   []byte(core.GenerateInstanceKey("a/b", 
"1", "1")),
+                               Value: nil,
+                       },
+               },
+               {
+                       Type: proto.EVT_UPDATE,
+                       KV: &discovery.KeyValue{
+                               Key:   []byte(core.GenerateInstanceKey("a/b", 
"1", "1")),
+                               Value: nil,
+                       },
+               },
+               {
+                       Type: proto.EVT_CREATE,
+                       KV: &discovery.KeyValue{
+                               Key:   []byte(core.GenerateInstanceKey("a/b", 
"1", "1")),
+                               Value: nil,
+                       },
+               },
+       }
+
+       for _, evt := range cases {
+               h.OnEvent(evt)
+       }
+       if counter.InstanceCount != 1 || counter.ServiceCount != 0 {
+               t.Fatal("TestNewServiceIndexEventHandler failed", counter)
+       }
+}
diff --git a/server/service/event/instance_event_handler.go 
b/server/service/event/instance_event_handler.go
index 1ea5310..e82ec04 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -40,6 +40,7 @@ func (h *InstanceEventHandler) Type() discovery.Type {
 
 func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
        action := evt.Type
+       instance := evt.KV.Value.(*pb.MicroServiceInstance)
        providerId, providerInstanceId, domainProject := 
apt.GetInfoFromInstKV(evt.KV.Key)
        idx := strings.Index(domainProject, "/")
        domainName := domainProject[:idx]
@@ -59,8 +60,8 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) 
{
        }
 
        if notify.NotifyCenter().Closed() {
-               log.Warnf("caught [%s] instance[%s/%s] event, but notify 
service is closed",
-                       action, providerId, providerInstanceId)
+               log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, but 
notify service is closed",
+                       action, providerId, providerInstanceId, 
instance.Endpoints)
                return
        }
 
@@ -70,13 +71,14 @@ func (h *InstanceEventHandler) OnEvent(evt 
discovery.KvEvent) {
                serviceUtil.CTX_GLOBAL, "1")
        ms, err := serviceUtil.GetService(ctx, domainProject, providerId)
        if ms == nil {
-               log.Errorf(err, "caught [%s] instance[%s/%s] event, get cached 
provider's file failed",
-                       action, providerId, providerInstanceId)
+               log.Errorf(err, "caught [%s] instance[%s/%s] event, endpoints 
%v, get cached provider's file failed",
+                       action, providerId, providerInstanceId, 
instance.Endpoints)
                return
        }
 
-       log.Infof("caught [%s] service[%s][%s/%s/%s/%s] instance[%s] event",
-               action, providerId, ms.Environment, ms.AppId, ms.ServiceName, 
ms.Version, providerInstanceId)
+       log.Infof("caught [%s] service[%s][%s/%s/%s/%s] instance[%s] event, 
endpoints %v",
+               action, providerId, ms.Environment, ms.AppId, ms.ServiceName, 
ms.Version,
+               providerInstanceId, instance.Endpoints)
 
        // 查询所有consumer
        consumerIds, _, err := serviceUtil.GetAllConsumerIds(ctx, 
domainProject, ms)
@@ -87,7 +89,7 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) 
{
        }
 
        PublishInstanceEvent(domainProject, action, 
pb.MicroServiceToKey(domainProject, ms),
-               evt.KV.Value.(*pb.MicroServiceInstance), evt.Revision, 
consumerIds)
+               instance, evt.Revision, consumerIds)
 }
 
 func NewInstanceEventHandler() *InstanceEventHandler {
diff --git a/server/service/util/instance_util.go 
b/server/service/util/instance_util.go
index 188caf1..51cd5fe 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -210,6 +210,14 @@ func queryServiceInstancesKvs(ctx context.Context, 
serviceId string, rev int64)
 }
 
 func UpdateInstance(ctx context.Context, domainProject string, instance 
*pb.MicroServiceInstance) *scerr.Error {
+       leaseID, err := GetLeaseId(ctx, domainProject, instance.ServiceId, 
instance.InstanceId)
+       if err != nil {
+               return scerr.NewError(scerr.ErrInternal, err.Error())
+       }
+       if leaseID == -1 {
+               return scerr.NewError(scerr.ErrInstanceNotExists, "Instance's 
leaseId not exist.")
+       }
+
        instance.ModTimestamp = strconv.FormatInt(time.Now().Unix(), 10)
        data, err := json.Marshal(instance)
        if err != nil {
@@ -217,11 +225,12 @@ func UpdateInstance(ctx context.Context, domainProject 
string, instance *pb.Micr
        }
 
        key := apt.GenerateInstanceKey(domainProject, instance.ServiceId, 
instance.InstanceId)
+
        resp, err := backend.Registry().TxnWithCmp(ctx,
                []registry.PluginOp{registry.OpPut(
                        registry.WithStrKey(key),
                        registry.WithValue(data),
-                       registry.WithIgnoreLease())},
+                       registry.WithLease(leaseID))},
                []registry.CompareOp{registry.OpCmp(
                        
registry.CmpVer(util.StringToBytesWithNoCopy(apt.GenerateServiceKey(domainProject,
 instance.ServiceId))),
                        registry.CMP_NOT_EQUAL, 0)},

Reply via email to