[ 
https://issues.apache.org/jira/browse/SCB-707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530915#comment-16530915
 ] 

ASF GitHub Bot commented on SCB-707:
------------------------------------

little-cui closed pull request #385: SCB-707 Etcd connection leak, lose events 
and cache mismatch in SC
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/385
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/core/backend/cache_kv.go b/server/core/backend/cache_kv.go
index 1c2c1bcc..cedb9dc6 100644
--- a/server/core/backend/cache_kv.go
+++ b/server/core/backend/cache_kv.go
@@ -457,6 +457,8 @@ func (c *KvCacher) onKvEvents(evts []KvEvent) {
        if c.Cfg.OnEvent == nil {
                return
        }
+       defer util.RecoverAndReport()
+
        for _, evt := range evts {
                if evt.Object == nil {
                        continue
diff --git a/server/plugin/infra/registry/etcd/etcd.go 
b/server/plugin/infra/registry/etcd/etcd.go
index e2bcec80..81b8636a 100644
--- a/server/plugin/infra/registry/etcd/etcd.go
+++ b/server/plugin/infra/registry/etcd/etcd.go
@@ -252,7 +252,6 @@ func (c *EtcdClient) PutNoOverride(ctx context.Context, 
opts ...registry.PluginO
                util.Logger().Errorf(err, "PutNoOverride %s failed", op.Key)
                return false, err
        }
-       util.Logger().Debugf("response %s %v %v", op.Key, resp.Succeeded, 
resp.Revision)
        return resp.Succeeded, nil
 }
 
@@ -441,7 +440,14 @@ func (c *EtcdClient) TxnWithCmp(ctx context.Context, 
success []registry.PluginOp
        etcdSuccessOps := c.toTxnRequest(success)
        etcdFailOps := c.toTxnRequest(fail)
 
-       span := TracingBegin(ctx, "etcd:txn", success[0])
+       var traceOps []registry.PluginOp
+       traceOps = append(traceOps, success...)
+       traceOps = append(traceOps, fail...)
+       if len(traceOps) == 0 {
+               return nil, fmt.Errorf("requested success or fail PluginOp 
list")
+       }
+
+       span := TracingBegin(ctx, "etcd:txn", traceOps[0])
        defer TracingEnd(span, err)
 
        kvc := clientv3.NewKV(c.Client)
@@ -648,32 +654,31 @@ func (c *EtcdClient) SyncMembers() error {
 func dispatch(evts []*clientv3.Event, cb registry.WatchCallback) error {
        l := len(evts)
        kvs := make([]*mvccpb.KeyValue, l)
-       sIdx, eIdx, prevAction := 0, 0, mvccpb.PUT
-       pResp := &registry.PluginResponse{Action: registry.Put, Succeeded: true}
+       sIdx, eIdx, rev := 0, 0, int64(0)
+       action, prevEvtType := registry.Put, mvccpb.PUT
 
        for _, evt := range evts {
-               if prevAction != evt.Type {
-                       prevAction = evt.Type
-
+               if prevEvtType != evt.Type {
                        if eIdx > 0 {
-                               err := setResponseAndCallback(pResp, 
kvs[sIdx:eIdx], cb)
+                               err := callback(action, rev, kvs[sIdx:eIdx], cb)
                                if err != nil {
                                        return err
                                }
                                sIdx = eIdx
                        }
+                       prevEvtType = evt.Type
                }
 
-               if pResp.Revision < evt.Kv.ModRevision {
-                       pResp.Revision = evt.Kv.ModRevision
+               if rev < evt.Kv.ModRevision {
+                       rev = evt.Kv.ModRevision
                }
-               pResp.Action = setKvsAndConvertAction(kvs, eIdx, evt)
+               action = setKvsAndConvertAction(kvs, eIdx, evt)
 
                eIdx++
        }
 
        if eIdx > 0 {
-               return setResponseAndCallback(pResp, kvs[sIdx:eIdx], cb)
+               return callback(action, rev, kvs[sIdx:eIdx], cb)
        }
        return nil
 }
@@ -693,10 +698,14 @@ func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx 
int, evt *clientv3.Even
        }
 }
 
-func setResponseAndCallback(pResp *registry.PluginResponse, kvs 
[]*mvccpb.KeyValue, cb registry.WatchCallback) error {
-       pResp.Count = int64(len(kvs))
-       pResp.Kvs = kvs
-       return cb("key information changed", pResp)
+func callback(action registry.ActionType, rev int64, kvs []*mvccpb.KeyValue, 
cb registry.WatchCallback) error {
+       return cb("key information changed", &registry.PluginResponse{
+               Action:    action,
+               Kvs:       kvs,
+               Count:     int64(len(kvs)),
+               Revision:  rev,
+               Succeeded: true,
+       })
 }
 
 func sslEnabled() bool {
@@ -761,6 +770,12 @@ func newClient(endpoints []string) (*clientv3.Client, 
error) {
                return client, nil
        }
 
+       defer func() {
+               if err != nil {
+                       client.Close()
+               }
+       }()
+
        ctx, _ := context.WithTimeout(client.Ctx(), healthCheckTimeout)
        resp, err := client.MemberList(ctx)
        if err != nil {
@@ -782,7 +797,8 @@ epLoop:
                        }
                }
                // maybe endpoints = [domain A, domain B] or there are more 
than one cluster
-               return nil, fmt.Errorf("the etcd cluster endpoint list%v does 
not contain %s", cluster, ep)
+               err = fmt.Errorf("the etcd cluster endpoint list%v does not 
contain %s", cluster, ep)
+               return nil, err
        }
        return client, nil
 }
diff --git a/server/service/util/dependency.go 
b/server/service/util/dependency.go
index 3229500f..4a5ed1b3 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -168,7 +168,7 @@ func DependencyRuleExist(ctx context.Context, provider 
*pb.MicroServiceKey, cons
                targetDomainProject = consumer.Tenant
        }
 
-       consumerKey := 
apt.GenerateConsumerDependencyRuleKey(targetDomainProject, consumer)
+       consumerKey := apt.GenerateConsumerDependencyRuleKey(consumer.Tenant, 
consumer)
        existed, err := dependencyRuleExistUtil(ctx, consumerKey, provider)
        if err != nil || existed {
                return existed, err
@@ -216,12 +216,16 @@ func AddServiceVersionRule(ctx context.Context, 
domainProject string, consumer *
 
        id := util.StringJoin([]string{provider.AppId, provider.ServiceName}, 
"_")
        key := apt.GenerateConsumerDependencyQueueKey(domainProject, 
consumer.ServiceId, id)
-       _, err = backend.Registry().Do(ctx, registry.PUT, 
registry.WithStrKey(key), registry.WithValue(data))
+       resp, err := backend.Registry().TxnWithCmp(ctx,
+               nil,
+               []registry.CompareOp{registry.OpCmp(registry.CmpStrVal(key), 
registry.CMP_EQUAL, util.BytesToStringWithNoCopy(data))},
+               []registry.PluginOp{registry.OpPut(registry.WithStrKey(key), 
registry.WithValue(data))})
        if err != nil {
                return err
        }
-
-       util.Logger().Infof("find request into dependency queue successfully, 
%s: %v", key, r)
+       if !resp.Succeeded {
+               util.Logger().Infof("find request into dependency queue 
successfully, %s: %v", key, r)
+       }
        return nil
 }
 
diff --git a/server/service/util/microservice_util.go 
b/server/service/util/microservice_util.go
index 375b1835..6f5d0394 100644
--- a/server/service/util/microservice_util.go
+++ b/server/service/util/microservice_util.go
@@ -141,18 +141,24 @@ func searchServiceIdFromAlias(ctx context.Context, key 
*pb.MicroServiceKey) (str
 }
 
 func GetServiceAllVersions(ctx context.Context, key *pb.MicroServiceKey, alias 
bool) (*registry.PluginResponse, error) {
-       key.Version = ""
-       var prefix string
+       copy := *key
+       copy.Version = ""
+       var (
+               prefix  string
+               indexer *backend.Indexer
+       )
        if alias {
-               prefix = apt.GenerateServiceAliasKey(key)
+               prefix = apt.GenerateServiceAliasKey(&copy)
+               indexer = backend.Store().ServiceAlias()
        } else {
-               prefix = apt.GenerateServiceIndexKey(key)
+               prefix = apt.GenerateServiceIndexKey(&copy)
+               indexer = backend.Store().ServiceIndex()
        }
        opts := append(FromContext(ctx),
                registry.WithStrKey(prefix),
                registry.WithPrefix(),
                registry.WithDescendOrder())
-       resp, err := backend.Store().ServiceIndex().Search(ctx, opts...)
+       resp, err := indexer.Search(ctx, opts...)
        return resp, err
 }
 
@@ -161,8 +167,9 @@ func FindServiceIds(ctx context.Context, versionRule 
string, key *pb.MicroServic
        ids := []string{}
        match := ParseVersionRule(versionRule)
        if match == nil {
-               key.Version = versionRule
-               serviceId, err := GetServiceId(ctx, key)
+               copy := *key
+               copy.Version = versionRule
+               serviceId, err := GetServiceId(ctx, &copy)
                if err != nil {
                        return nil, err
                }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Etcd connection leak, lose events and cache mismatch in SC
> ----------------------------------------------------------
>
>                 Key: SCB-707
>                 URL: https://issues.apache.org/jira/browse/SCB-707
>             Project: Apache ServiceComb
>          Issue Type: Bug
>          Components: Service-Center
>            Reporter: little-cui
>            Assignee: little-cui
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to