This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new f1e6a57 [SCB-2094] support instance subscribe (#827)
f1e6a57 is described below
commit f1e6a575b73fc19b70e31d9829aa25d254f282fe
Author: panqian <[email protected]>
AuthorDate: Fri Jan 15 16:03:05 2021 +0800
[SCB-2094] support instance subscribe (#827)
---
datasource/mongo/dep_util.go | 58 ++++++++++++++++++
datasource/mongo/event/instance_event_handler.go | 33 ++++++++++-
datasource/mongo/ms.go | 4 +-
datasource/mongo/rule_util.go | 75 ++++++++++++++++++++++++
datasource/mongo/rule_util_test.go | 53 +++++++++++++++++
server/notify/instance_subscriber.go | 3 +-
6 files changed, 222 insertions(+), 4 deletions(-)
diff --git a/datasource/mongo/dep_util.go b/datasource/mongo/dep_util.go
new file mode 100644
index 0000000..8ae088f
--- /dev/null
+++ b/datasource/mongo/dep_util.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ pb "github.com/go-chassis/cari/discovery"
+)
+
+func GetAllConsumerIds(ctx context.Context, provider *pb.MicroService) (allow
[]string, deny []string, _ error) {
+ if provider == nil || len(provider.ServiceId) == 0 {
+ return nil, nil, fmt.Errorf("invalid provider")
+ }
+
+ //todo 删除服务,最后实例推送有误差
+ domain := util.ParseDomainProject(ctx)
+ project := util.ParseProject(ctx)
+ providerRules, err := getRulesUtil(ctx, domain, project,
provider.ServiceId)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ allow, deny, err = GetConsumerIDsWithFilter(ctx, provider,
providerRules)
+ if err != nil {
+ return nil, nil, err
+ }
+ return allow, deny, nil
+}
+
+func GetConsumerIDsWithFilter(ctx context.Context, provider *pb.MicroService,
rules []*Rule) (allow []string, deny []string, err error) {
+ domainProject := util.ParseDomainProject(ctx)
+ dr := NewProviderDependencyRelation(ctx, domainProject, provider)
+ consumerIDs, err := dr.GetDependencyConsumerIds()
+ if err != nil {
+ log.Error(fmt.Sprintf("get service[%s]'s consumerIds failed",
provider.ServiceId), err)
+ return nil, nil, err
+ }
+ return FilterAll(ctx, consumerIDs, rules)
+}
diff --git a/datasource/mongo/event/instance_event_handler.go
b/datasource/mongo/event/instance_event_handler.go
index c870b8a..c11328c 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -21,6 +21,10 @@ import (
"context"
"errors"
"fmt"
+ "time"
+
+ simple "github.com/apache/servicecomb-service-center/pkg/time"
+ "github.com/apache/servicecomb-service-center/server/notify"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/mongo"
@@ -47,7 +51,7 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
instance := evt.Value.(sd.Instance)
providerID := instance.InstanceInfo.ServiceId
providerInstanceID := instance.InstanceInfo.InstanceId
-
+ domainProject := instance.Domain + "/" + instance.Project
cacheService := sd.Store().Service().Cache().Get(providerID)
var microService *discovery.MicroService
if cacheService != nil {
@@ -74,12 +78,39 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
if !syncernotify.GetSyncerNotifyCenter().Closed() {
NotifySyncerInstanceEvent(evt, microService)
}
+ ctx := util.SetDomainProject(context.Background(), instance.Domain,
instance.Project)
+ consumerIDS, _, err := mongo.GetAllConsumerIds(ctx, microService)
+ if err != nil {
+ log.Error(fmt.Sprintf("get service[%s][%s/%s/%s/%s]'s
consumerIDs failed",
+ providerID, microService.Environment,
microService.AppId, microService.ServiceName, microService.Version), err)
+ return
+ }
+ PublishInstanceEvent(evt, domainProject,
discovery.MicroServiceToKey(domainProject, microService), consumerIDS)
}
func NewInstanceEventHandler() *InstanceEventHandler {
return &InstanceEventHandler{}
}
+func PublishInstanceEvent(evt sd.MongoEvent, domainProject string, serviceKey
*discovery.MicroServiceKey, subscribers []string) {
+ if len(subscribers) == 0 {
+ return
+ }
+ response := &discovery.WatchInstanceResponse{
+ Response: discovery.CreateResponse(discovery.ResponseSuccess,
"Watch instance successfully."),
+ Action: string(evt.Type),
+ Key: serviceKey,
+ Instance: evt.Value.(sd.Instance).InstanceInfo,
+ }
+ for _, consumerID := range subscribers {
+ evt := notify.NewInstanceEventWithTime(consumerID,
domainProject, -1, simple.FromTime(time.Now()), response)
+ err := notify.Center().Publish(evt)
+ if err != nil {
+ log.Error(fmt.Sprintf("publish event[%v] into channel
failed", evt), err)
+ }
+ }
+}
+
func NotifySyncerInstanceEvent(event sd.MongoEvent, microService
*discovery.MicroService) {
instance := event.Value.(sd.Instance).InstanceInfo
log.Info(fmt.Sprintf("instanceId : %s and serviceId : %s in
NotifySyncerInstanceEvent", instance.InstanceId, instance.ServiceId))
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index 075c536..16eb4ab 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -2723,10 +2723,10 @@ func accessible(ctx context.Context, consumerID string,
providerID string) *disc
if err != nil {
return discovery.NewError(discovery.ErrInternal,
fmt.Sprintf("an error occurred in query consumer tags(%s)", err.Error()))
}
- return matchRules(rules, consumerService.ServiceInfo, validateTags)
+ return MatchRules(rules, consumerService.ServiceInfo, validateTags)
}
-func matchRules(rulesOfProvider []*Rule, consumer *discovery.MicroService,
tagsOfConsumer map[string]string) *discovery.Error {
+func MatchRules(rulesOfProvider []*Rule, consumer *discovery.MicroService,
tagsOfConsumer map[string]string) *discovery.Error {
if consumer == nil {
return discovery.NewError(discovery.ErrInvalidParams, "consumer
is nil")
}
diff --git a/datasource/mongo/rule_util.go b/datasource/mongo/rule_util.go
new file mode 100644
index 0000000..d7df917
--- /dev/null
+++ b/datasource/mongo/rule_util.go
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo
+
+import (
+ "context"
+
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/go-chassis/cari/discovery"
+)
+
+func Filter(ctx context.Context, rules []*Rule, consumerID string) (bool,
error) {
+ consumer, err := GetServiceByID(ctx, consumerID)
+ if consumer == nil {
+ return false, err
+ }
+
+ if len(rules) == 0 {
+ return true, nil
+ }
+ domain := util.ParseDomainProject(ctx)
+ project := util.ParseProject(ctx)
+
+ tags, err := getTags(ctx, domain, project, consumerID)
+ if err != nil {
+ return false, err
+ }
+ matchErr := MatchRules(rules, consumer.ServiceInfo, tags)
+ if matchErr != nil {
+ if matchErr.Code == discovery.ErrPermissionDeny {
+ return false, nil
+ }
+ return false, matchErr
+ }
+ return true, nil
+}
+
+func FilterAll(ctx context.Context, consumerIDs []string, rules []*Rule)
(allow []string, deny []string, err error) {
+ l := len(consumerIDs)
+ if l == 0 || len(rules) == 0 {
+ return consumerIDs, nil, nil
+ }
+
+ allowIdx, denyIdx := 0, l
+ consumers := make([]string, l)
+ for _, consumerID := range consumerIDs {
+ ok, err := Filter(ctx, rules, consumerID)
+ if err != nil {
+ return nil, nil, err
+ }
+ if ok {
+ consumers[allowIdx] = consumerID
+ allowIdx++
+ } else {
+ denyIdx--
+ consumers[denyIdx] = consumerID
+ }
+ }
+ return consumers[:allowIdx], consumers[denyIdx:], nil
+}
diff --git a/datasource/mongo/rule_util_test.go
b/datasource/mongo/rule_util_test.go
new file mode 100644
index 0000000..a35ad69
--- /dev/null
+++ b/datasource/mongo/rule_util_test.go
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo_test
+
+import (
+ "context"
+ "errors"
+ "testing"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/mongo"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestRuleFilter_Filter(t *testing.T) {
+ var err error
+ t.Run("when there is no such a customer in db", func(t *testing.T) {
+ _, err = mongo.Filter(context.Background(), []*mongo.Rule{}, "")
+ if err != nil && !errors.Is(err, datasource.ErrNoData) {
+ t.Fatalf("RuleFilter Filter failed")
+ }
+ assert.Equal(t, datasource.ErrNoData, err, "no data found")
+ })
+ t.Run("FilterAll when customer not exist", func(t *testing.T) {
+ _, _, err = mongo.FilterAll(context.Background(), []string{""},
[]*mongo.Rule{})
+ if err != nil && !errors.Is(err, datasource.ErrNoData) {
+ t.Fatalf("RuleFilter FilterAll failed")
+ }
+ assert.Equal(t, nil, err, "no customer found err is nil")
+ })
+ t.Run("FilterAll when ProviderRules not nil and service not exist",
func(t *testing.T) {
+ _, _, err = mongo.FilterAll(context.Background(), []string{""},
[]*mongo.Rule{{}})
+ if err != nil && !errors.Is(err, datasource.ErrNoData) {
+ t.Fatalf("RuleFilter FilterAll failed")
+ }
+ assert.Equal(t, datasource.ErrNoData, err, "no customer found
when FilterAll")
+ })
+}
diff --git a/server/notify/instance_subscriber.go
b/server/notify/instance_subscriber.go
index ac9442e..ed295d8 100644
--- a/server/notify/instance_subscriber.go
+++ b/server/notify/instance_subscriber.go
@@ -104,7 +104,8 @@ func (w *InstanceEventListWatcher) OnMessage(job
notify.Event) {
}
}
- if wJob.Revision <= w.ListRevision {
+ // the negative revision is specially for mongo scene,should be removed
after mongo support revison.
+ if wJob.Revision >= 0 && wJob.Revision <= w.ListRevision {
log.Warnf("unexpected notify %s job is coming in, watcher %s
%s, job is %v, current revision is %v",
w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
return