[
https://issues.apache.org/jira/browse/SCB-707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530915#comment-16530915
]
ASF GitHub Bot commented on SCB-707:
------------------------------------
little-cui closed pull request #385: SCB-707 Etcd connection leak, lose events
and cache mismatch in SC
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/385
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/server/core/backend/cache_kv.go b/server/core/backend/cache_kv.go
index 1c2c1bcc..cedb9dc6 100644
--- a/server/core/backend/cache_kv.go
+++ b/server/core/backend/cache_kv.go
@@ -457,6 +457,8 @@ func (c *KvCacher) onKvEvents(evts []KvEvent) {
if c.Cfg.OnEvent == nil {
return
}
+ defer util.RecoverAndReport()
+
for _, evt := range evts {
if evt.Object == nil {
continue
diff --git a/server/plugin/infra/registry/etcd/etcd.go
b/server/plugin/infra/registry/etcd/etcd.go
index e2bcec80..81b8636a 100644
--- a/server/plugin/infra/registry/etcd/etcd.go
+++ b/server/plugin/infra/registry/etcd/etcd.go
@@ -252,7 +252,6 @@ func (c *EtcdClient) PutNoOverride(ctx context.Context,
opts ...registry.PluginO
util.Logger().Errorf(err, "PutNoOverride %s failed", op.Key)
return false, err
}
- util.Logger().Debugf("response %s %v %v", op.Key, resp.Succeeded,
resp.Revision)
return resp.Succeeded, nil
}
@@ -441,7 +440,14 @@ func (c *EtcdClient) TxnWithCmp(ctx context.Context,
success []registry.PluginOp
etcdSuccessOps := c.toTxnRequest(success)
etcdFailOps := c.toTxnRequest(fail)
- span := TracingBegin(ctx, "etcd:txn", success[0])
+ var traceOps []registry.PluginOp
+ traceOps = append(traceOps, success...)
+ traceOps = append(traceOps, fail...)
+ if len(traceOps) == 0 {
+ return nil, fmt.Errorf("requested success or fail PluginOp
list")
+ }
+
+ span := TracingBegin(ctx, "etcd:txn", traceOps[0])
defer TracingEnd(span, err)
kvc := clientv3.NewKV(c.Client)
@@ -648,32 +654,31 @@ func (c *EtcdClient) SyncMembers() error {
func dispatch(evts []*clientv3.Event, cb registry.WatchCallback) error {
l := len(evts)
kvs := make([]*mvccpb.KeyValue, l)
- sIdx, eIdx, prevAction := 0, 0, mvccpb.PUT
- pResp := ®istry.PluginResponse{Action: registry.Put, Succeeded: true}
+ sIdx, eIdx, rev := 0, 0, int64(0)
+ action, prevEvtType := registry.Put, mvccpb.PUT
for _, evt := range evts {
- if prevAction != evt.Type {
- prevAction = evt.Type
-
+ if prevEvtType != evt.Type {
if eIdx > 0 {
- err := setResponseAndCallback(pResp,
kvs[sIdx:eIdx], cb)
+ err := callback(action, rev, kvs[sIdx:eIdx], cb)
if err != nil {
return err
}
sIdx = eIdx
}
+ prevEvtType = evt.Type
}
- if pResp.Revision < evt.Kv.ModRevision {
- pResp.Revision = evt.Kv.ModRevision
+ if rev < evt.Kv.ModRevision {
+ rev = evt.Kv.ModRevision
}
- pResp.Action = setKvsAndConvertAction(kvs, eIdx, evt)
+ action = setKvsAndConvertAction(kvs, eIdx, evt)
eIdx++
}
if eIdx > 0 {
- return setResponseAndCallback(pResp, kvs[sIdx:eIdx], cb)
+ return callback(action, rev, kvs[sIdx:eIdx], cb)
}
return nil
}
@@ -693,10 +698,14 @@ func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx
int, evt *clientv3.Even
}
}
-func setResponseAndCallback(pResp *registry.PluginResponse, kvs
[]*mvccpb.KeyValue, cb registry.WatchCallback) error {
- pResp.Count = int64(len(kvs))
- pResp.Kvs = kvs
- return cb("key information changed", pResp)
+func callback(action registry.ActionType, rev int64, kvs []*mvccpb.KeyValue,
cb registry.WatchCallback) error {
+ return cb("key information changed", ®istry.PluginResponse{
+ Action: action,
+ Kvs: kvs,
+ Count: int64(len(kvs)),
+ Revision: rev,
+ Succeeded: true,
+ })
}
func sslEnabled() bool {
@@ -761,6 +770,12 @@ func newClient(endpoints []string) (*clientv3.Client,
error) {
return client, nil
}
+ defer func() {
+ if err != nil {
+ client.Close()
+ }
+ }()
+
ctx, _ := context.WithTimeout(client.Ctx(), healthCheckTimeout)
resp, err := client.MemberList(ctx)
if err != nil {
@@ -782,7 +797,8 @@ epLoop:
}
}
// maybe endpoints = [domain A, domain B] or there are more
than one cluster
- return nil, fmt.Errorf("the etcd cluster endpoint list%v does
not contain %s", cluster, ep)
+ err = fmt.Errorf("the etcd cluster endpoint list%v does not
contain %s", cluster, ep)
+ return nil, err
}
return client, nil
}
diff --git a/server/service/util/dependency.go
b/server/service/util/dependency.go
index 3229500f..4a5ed1b3 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -168,7 +168,7 @@ func DependencyRuleExist(ctx context.Context, provider
*pb.MicroServiceKey, cons
targetDomainProject = consumer.Tenant
}
- consumerKey :=
apt.GenerateConsumerDependencyRuleKey(targetDomainProject, consumer)
+ consumerKey := apt.GenerateConsumerDependencyRuleKey(consumer.Tenant,
consumer)
existed, err := dependencyRuleExistUtil(ctx, consumerKey, provider)
if err != nil || existed {
return existed, err
@@ -216,12 +216,16 @@ func AddServiceVersionRule(ctx context.Context,
domainProject string, consumer *
id := util.StringJoin([]string{provider.AppId, provider.ServiceName},
"_")
key := apt.GenerateConsumerDependencyQueueKey(domainProject,
consumer.ServiceId, id)
- _, err = backend.Registry().Do(ctx, registry.PUT,
registry.WithStrKey(key), registry.WithValue(data))
+ resp, err := backend.Registry().TxnWithCmp(ctx,
+ nil,
+ []registry.CompareOp{registry.OpCmp(registry.CmpStrVal(key),
registry.CMP_EQUAL, util.BytesToStringWithNoCopy(data))},
+ []registry.PluginOp{registry.OpPut(registry.WithStrKey(key),
registry.WithValue(data))})
if err != nil {
return err
}
-
- util.Logger().Infof("find request into dependency queue successfully,
%s: %v", key, r)
+ if !resp.Succeeded {
+ util.Logger().Infof("find request into dependency queue
successfully, %s: %v", key, r)
+ }
return nil
}
diff --git a/server/service/util/microservice_util.go
b/server/service/util/microservice_util.go
index 375b1835..6f5d0394 100644
--- a/server/service/util/microservice_util.go
+++ b/server/service/util/microservice_util.go
@@ -141,18 +141,24 @@ func searchServiceIdFromAlias(ctx context.Context, key
*pb.MicroServiceKey) (str
}
func GetServiceAllVersions(ctx context.Context, key *pb.MicroServiceKey, alias
bool) (*registry.PluginResponse, error) {
- key.Version = ""
- var prefix string
+ copy := *key
+ copy.Version = ""
+ var (
+ prefix string
+ indexer *backend.Indexer
+ )
if alias {
- prefix = apt.GenerateServiceAliasKey(key)
+ prefix = apt.GenerateServiceAliasKey(©)
+ indexer = backend.Store().ServiceAlias()
} else {
- prefix = apt.GenerateServiceIndexKey(key)
+ prefix = apt.GenerateServiceIndexKey(©)
+ indexer = backend.Store().ServiceIndex()
}
opts := append(FromContext(ctx),
registry.WithStrKey(prefix),
registry.WithPrefix(),
registry.WithDescendOrder())
- resp, err := backend.Store().ServiceIndex().Search(ctx, opts...)
+ resp, err := indexer.Search(ctx, opts...)
return resp, err
}
@@ -161,8 +167,9 @@ func FindServiceIds(ctx context.Context, versionRule
string, key *pb.MicroServic
ids := []string{}
match := ParseVersionRule(versionRule)
if match == nil {
- key.Version = versionRule
- serviceId, err := GetServiceId(ctx, key)
+ copy := *key
+ copy.Version = versionRule
+ serviceId, err := GetServiceId(ctx, ©)
if err != nil {
return nil, err
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Etcd connection leak, lose events and cache mismatch in SC
> ----------------------------------------------------------
>
> Key: SCB-707
> URL: https://issues.apache.org/jira/browse/SCB-707
> Project: Apache ServiceComb
> Issue Type: Bug
> Components: Service-Center
> Reporter: little-cui
> Assignee: little-cui
> Priority: Major
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)