This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new f1e6a57  [SCB-2094] support instance subscribe (#827)
f1e6a57 is described below

commit f1e6a575b73fc19b70e31d9829aa25d254f282fe
Author: panqian <[email protected]>
AuthorDate: Fri Jan 15 16:03:05 2021 +0800

    [SCB-2094] support instance subscribe (#827)
---
 datasource/mongo/dep_util.go                     | 58 ++++++++++++++++++
 datasource/mongo/event/instance_event_handler.go | 33 ++++++++++-
 datasource/mongo/ms.go                           |  4 +-
 datasource/mongo/rule_util.go                    | 75 ++++++++++++++++++++++++
 datasource/mongo/rule_util_test.go               | 53 +++++++++++++++++
 server/notify/instance_subscriber.go             |  3 +-
 6 files changed, 222 insertions(+), 4 deletions(-)

diff --git a/datasource/mongo/dep_util.go b/datasource/mongo/dep_util.go
new file mode 100644
index 0000000..8ae088f
--- /dev/null
+++ b/datasource/mongo/dep_util.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       pb "github.com/go-chassis/cari/discovery"
+)
+
+func GetAllConsumerIds(ctx context.Context, provider *pb.MicroService) (allow 
[]string, deny []string, _ error) {
+       if provider == nil || len(provider.ServiceId) == 0 {
+               return nil, nil, fmt.Errorf("invalid provider")
+       }
+
+       //todo 删除服务,最后实例推送有误差
+       domain := util.ParseDomainProject(ctx)
+       project := util.ParseProject(ctx)
+       providerRules, err := getRulesUtil(ctx, domain, project, 
provider.ServiceId)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       allow, deny, err = GetConsumerIDsWithFilter(ctx, provider, 
providerRules)
+       if err != nil {
+               return nil, nil, err
+       }
+       return allow, deny, nil
+}
+
+func GetConsumerIDsWithFilter(ctx context.Context, provider *pb.MicroService, 
rules []*Rule) (allow []string, deny []string, err error) {
+       domainProject := util.ParseDomainProject(ctx)
+       dr := NewProviderDependencyRelation(ctx, domainProject, provider)
+       consumerIDs, err := dr.GetDependencyConsumerIds()
+       if err != nil {
+               log.Error(fmt.Sprintf("get service[%s]'s consumerIds failed", 
provider.ServiceId), err)
+               return nil, nil, err
+       }
+       return FilterAll(ctx, consumerIDs, rules)
+}
diff --git a/datasource/mongo/event/instance_event_handler.go 
b/datasource/mongo/event/instance_event_handler.go
index c870b8a..c11328c 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -21,6 +21,10 @@ import (
        "context"
        "errors"
        "fmt"
+       "time"
+
+       simple "github.com/apache/servicecomb-service-center/pkg/time"
+       "github.com/apache/servicecomb-service-center/server/notify"
 
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/datasource/mongo"
@@ -47,7 +51,7 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
        instance := evt.Value.(sd.Instance)
        providerID := instance.InstanceInfo.ServiceId
        providerInstanceID := instance.InstanceInfo.InstanceId
-
+       domainProject := instance.Domain + "/" + instance.Project
        cacheService := sd.Store().Service().Cache().Get(providerID)
        var microService *discovery.MicroService
        if cacheService != nil {
@@ -74,12 +78,39 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
        if !syncernotify.GetSyncerNotifyCenter().Closed() {
                NotifySyncerInstanceEvent(evt, microService)
        }
+       ctx := util.SetDomainProject(context.Background(), instance.Domain, 
instance.Project)
+       consumerIDS, _, err := mongo.GetAllConsumerIds(ctx, microService)
+       if err != nil {
+               log.Error(fmt.Sprintf("get service[%s][%s/%s/%s/%s]'s 
consumerIDs failed",
+                       providerID, microService.Environment, 
microService.AppId, microService.ServiceName, microService.Version), err)
+               return
+       }
+       PublishInstanceEvent(evt, domainProject, 
discovery.MicroServiceToKey(domainProject, microService), consumerIDS)
 }
 
 func NewInstanceEventHandler() *InstanceEventHandler {
        return &InstanceEventHandler{}
 }
 
+func PublishInstanceEvent(evt sd.MongoEvent, domainProject string, serviceKey 
*discovery.MicroServiceKey, subscribers []string) {
+       if len(subscribers) == 0 {
+               return
+       }
+       response := &discovery.WatchInstanceResponse{
+               Response: discovery.CreateResponse(discovery.ResponseSuccess, 
"Watch instance successfully."),
+               Action:   string(evt.Type),
+               Key:      serviceKey,
+               Instance: evt.Value.(sd.Instance).InstanceInfo,
+       }
+       for _, consumerID := range subscribers {
+               evt := notify.NewInstanceEventWithTime(consumerID, 
domainProject, -1, simple.FromTime(time.Now()), response)
+               err := notify.Center().Publish(evt)
+               if err != nil {
+                       log.Error(fmt.Sprintf("publish event[%v] into channel 
failed", evt), err)
+               }
+       }
+}
+
 func NotifySyncerInstanceEvent(event sd.MongoEvent, microService 
*discovery.MicroService) {
        instance := event.Value.(sd.Instance).InstanceInfo
        log.Info(fmt.Sprintf("instanceId : %s and serviceId : %s in 
NotifySyncerInstanceEvent", instance.InstanceId, instance.ServiceId))
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index 075c536..16eb4ab 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -2723,10 +2723,10 @@ func accessible(ctx context.Context, consumerID string, 
providerID string) *disc
        if err != nil {
                return discovery.NewError(discovery.ErrInternal, 
fmt.Sprintf("an error occurred in query consumer tags(%s)", err.Error()))
        }
-       return matchRules(rules, consumerService.ServiceInfo, validateTags)
+       return MatchRules(rules, consumerService.ServiceInfo, validateTags)
 }
 
-func matchRules(rulesOfProvider []*Rule, consumer *discovery.MicroService, 
tagsOfConsumer map[string]string) *discovery.Error {
+func MatchRules(rulesOfProvider []*Rule, consumer *discovery.MicroService, 
tagsOfConsumer map[string]string) *discovery.Error {
        if consumer == nil {
                return discovery.NewError(discovery.ErrInvalidParams, "consumer 
is nil")
        }
diff --git a/datasource/mongo/rule_util.go b/datasource/mongo/rule_util.go
new file mode 100644
index 0000000..d7df917
--- /dev/null
+++ b/datasource/mongo/rule_util.go
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo
+
+import (
+       "context"
+
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/go-chassis/cari/discovery"
+)
+
+func Filter(ctx context.Context, rules []*Rule, consumerID string) (bool, 
error) {
+       consumer, err := GetServiceByID(ctx, consumerID)
+       if consumer == nil {
+               return false, err
+       }
+
+       if len(rules) == 0 {
+               return true, nil
+       }
+       domain := util.ParseDomainProject(ctx)
+       project := util.ParseProject(ctx)
+
+       tags, err := getTags(ctx, domain, project, consumerID)
+       if err != nil {
+               return false, err
+       }
+       matchErr := MatchRules(rules, consumer.ServiceInfo, tags)
+       if matchErr != nil {
+               if matchErr.Code == discovery.ErrPermissionDeny {
+                       return false, nil
+               }
+               return false, matchErr
+       }
+       return true, nil
+}
+
+func FilterAll(ctx context.Context, consumerIDs []string, rules []*Rule) 
(allow []string, deny []string, err error) {
+       l := len(consumerIDs)
+       if l == 0 || len(rules) == 0 {
+               return consumerIDs, nil, nil
+       }
+
+       allowIdx, denyIdx := 0, l
+       consumers := make([]string, l)
+       for _, consumerID := range consumerIDs {
+               ok, err := Filter(ctx, rules, consumerID)
+               if err != nil {
+                       return nil, nil, err
+               }
+               if ok {
+                       consumers[allowIdx] = consumerID
+                       allowIdx++
+               } else {
+                       denyIdx--
+                       consumers[denyIdx] = consumerID
+               }
+       }
+       return consumers[:allowIdx], consumers[denyIdx:], nil
+}
diff --git a/datasource/mongo/rule_util_test.go 
b/datasource/mongo/rule_util_test.go
new file mode 100644
index 0000000..a35ad69
--- /dev/null
+++ b/datasource/mongo/rule_util_test.go
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo_test
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/mongo"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestRuleFilter_Filter(t *testing.T) {
+       var err error
+       t.Run("when there is no such a customer in db", func(t *testing.T) {
+               _, err = mongo.Filter(context.Background(), []*mongo.Rule{}, "")
+               if err != nil && !errors.Is(err, datasource.ErrNoData) {
+                       t.Fatalf("RuleFilter Filter failed")
+               }
+               assert.Equal(t, datasource.ErrNoData, err, "no data found")
+       })
+       t.Run("FilterAll when customer not exist", func(t *testing.T) {
+               _, _, err = mongo.FilterAll(context.Background(), []string{""}, 
[]*mongo.Rule{})
+               if err != nil && !errors.Is(err, datasource.ErrNoData) {
+                       t.Fatalf("RuleFilter FilterAll failed")
+               }
+               assert.Equal(t, nil, err, "no customer found err is nil")
+       })
+       t.Run("FilterAll when ProviderRules not nil and service not exist", 
func(t *testing.T) {
+               _, _, err = mongo.FilterAll(context.Background(), []string{""}, 
[]*mongo.Rule{{}})
+               if err != nil && !errors.Is(err, datasource.ErrNoData) {
+                       t.Fatalf("RuleFilter FilterAll failed")
+               }
+               assert.Equal(t, datasource.ErrNoData, err, "no customer found 
when FilterAll")
+       })
+}
diff --git a/server/notify/instance_subscriber.go 
b/server/notify/instance_subscriber.go
index ac9442e..ed295d8 100644
--- a/server/notify/instance_subscriber.go
+++ b/server/notify/instance_subscriber.go
@@ -104,7 +104,8 @@ func (w *InstanceEventListWatcher) OnMessage(job 
notify.Event) {
                }
        }
 
-       if wJob.Revision <= w.ListRevision {
+       // the negative revision is specially for mongo scene,should be removed 
after mongo support revison.
+       if wJob.Revision >= 0 && wJob.Revision <= w.ListRevision {
                log.Warnf("unexpected notify %s job is coming in, watcher %s 
%s, job is %v, current revision is %v",
                        w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
                return

Reply via email to