This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 222c0d3 SC sync (#803)
222c0d3 is described below
commit 222c0d3cba96b8680a69dc9f2abde3dae74ffa78
Author: yeyiwei <[email protected]>
AuthorDate: Wed Jan 6 19:51:21 2021 +0800
SC sync (#803)
---
datasource/mongo/bootstrap/bootstrap.go | 3 +
.../{bootstrap/bootstrap.go => event/event.go} | 14 ++-
datasource/mongo/event/instance_event_handler.go | 117 +++++++++++++++++++++
.../mongo/event/instance_event_handler_test.go | 113 ++++++++++++++++++++
4 files changed, 242 insertions(+), 5 deletions(-)
diff --git a/datasource/mongo/bootstrap/bootstrap.go
b/datasource/mongo/bootstrap/bootstrap.go
index 3118120..1dc6119 100644
--- a/datasource/mongo/bootstrap/bootstrap.go
+++ b/datasource/mongo/bootstrap/bootstrap.go
@@ -22,4 +22,7 @@ import (
// heartbeat
_
"github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat/heartbeatchecker"
+
+ // events
+ _ "github.com/apache/servicecomb-service-center/datasource/mongo/event"
)
diff --git a/datasource/mongo/bootstrap/bootstrap.go
b/datasource/mongo/event/event.go
similarity index 72%
copy from datasource/mongo/bootstrap/bootstrap.go
copy to datasource/mongo/event/event.go
index 3118120..528e8a2 100644
--- a/datasource/mongo/bootstrap/bootstrap.go
+++ b/datasource/mongo/event/event.go
@@ -15,11 +15,15 @@
* limitations under the License.
*/
-package bootstrap
+package event
import (
- _ "github.com/apache/servicecomb-service-center/datasource/mongo"
-
- // heartbeat
- _
"github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat/heartbeatchecker"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
+ "github.com/apache/servicecomb-service-center/pkg/log"
)
+
+func init() {
+ log.Info("event init")
+ instanceEventHandler := NewInstanceEventHandler()
+
sd.EventProxy(instanceEventHandler.Type()).AddHandleFunc(instanceEventHandler.OnEvent)
+}
diff --git a/datasource/mongo/event/instance_event_handler.go
b/datasource/mongo/event/instance_event_handler.go
new file mode 100644
index 0000000..e9ecf12
--- /dev/null
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/mongo"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
+ "github.com/apache/servicecomb-service-center/pkg/dump"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/syncernotify"
+ "github.com/go-chassis/cari/discovery"
+ "go.mongodb.org/mongo-driver/bson"
+)
+
+// InstanceEventHandler is the handler to handle events
+//as instance registry or instance delete, and notify syncer
+type InstanceEventHandler struct {
+}
+
+func (h InstanceEventHandler) Type() string {
+ return mongo.CollectionInstance
+}
+
+func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
+ action := evt.Type
+ instance := evt.Value.(sd.Instance)
+ providerID := instance.InstanceInfo.ServiceId
+ providerInstanceID := instance.InstanceInfo.InstanceId
+
+ cacheService := sd.Store().Service().Cache().Get(providerID)
+ var microService *discovery.MicroService
+ if cacheService != nil {
+ microService = cacheService.(sd.Service).ServiceInfo
+ }
+ if microService == nil {
+ log.Info("get cached service failed, then get from database")
+ service, err := mongo.GetService(context.Background(),
bson.M{"serviceinfo.serviceid": providerID})
+ if err != nil {
+ log.Error("query database error", err)
+ return
+ }
+ if service == nil {
+ log.Warn(fmt.Sprintf("there is no service with id [%s]
in the database", providerID))
+ return
+ }
+ microService = service.ServiceInfo // service in the cache may
not ready, query from db once
+ if microService == nil {
+ log.Warn(fmt.Sprintf("caught [%s] instance[%s/%s]
event, endpoints %v, get provider's file failed from db\n",
+ action, providerID, providerInstanceID,
instance.InstanceInfo.Endpoints))
+ return
+ }
+ }
+ if !syncernotify.GetSyncerNotifyCenter().Closed() {
+ NotifySyncerInstanceEvent(evt, microService)
+ }
+}
+
+func NewInstanceEventHandler() *InstanceEventHandler {
+ return &InstanceEventHandler{}
+}
+
+func NotifySyncerInstanceEvent(event sd.MongoEvent, microService
*discovery.MicroService) {
+ instance := event.Value.(sd.Instance).InstanceInfo
+ log.Info(fmt.Sprintf("instanceId : %s and serviceId : %s in
NotifySyncerInstanceEvent", instance.InstanceId, instance.ServiceId))
+ instanceKey := util.StringJoin([]string{datasource.InstanceKeyPrefix,
event.Value.(sd.Instance).Domain,
+ event.Value.(sd.Instance).Project, instance.ServiceId,
instance.InstanceId}, datasource.SPLIT)
+
+ instanceKv := dump.KV{
+ Key: instanceKey,
+ Value: instance,
+ }
+
+ dumpInstance := dump.Instance{
+ KV: &instanceKv,
+ Value: instance,
+ }
+ serviceKey := util.StringJoin([]string{datasource.ServiceKeyPrefix,
event.Value.(sd.Instance).Domain,
+ event.Value.(sd.Instance).Project, instance.ServiceId},
datasource.SPLIT)
+ serviceKv := dump.KV{
+ Key: serviceKey,
+ Value: microService,
+ }
+
+ dumpService := dump.Microservice{
+ KV: &serviceKv,
+ Value: microService,
+ }
+
+ instEvent := &dump.WatchInstanceChangedEvent{
+ Action: string(event.Type),
+ Service: &dumpService,
+ Instance: &dumpInstance,
+ }
+ syncernotify.GetSyncerNotifyCenter().AddEvent(instEvent)
+
+ log.Debug(fmt.Sprintf("success to add instance change event action
[%s], instanceKey : %s to event queue", instEvent.Action, instanceKey))
+}
diff --git a/datasource/mongo/event/instance_event_handler_test.go
b/datasource/mongo/event/instance_event_handler_test.go
new file mode 100644
index 0000000..5f0e168
--- /dev/null
+++ b/datasource/mongo/event/instance_event_handler_test.go
@@ -0,0 +1,113 @@
+package event
+
+import (
+ "bou.ke/monkey"
+ "reflect"
+ "testing"
+
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
+ "github.com/apache/servicecomb-service-center/server/syncernotify"
+ "github.com/go-chassis/cari/discovery"
+ "github.com/go-chassis/go-chassis/v2/storage"
+ "github.com/stretchr/testify/assert"
+)
+
+func init() {
+ config := storage.Options{
+ URI: "mongodb://localhost:27017",
+ }
+ client.NewMongoClient(config)
+}
+
+func TestInstanceEventHandler_OnEvent(t *testing.T) {
+
+ t.Run("microservice not nil after query database", func(t *testing.T) {
+ h := InstanceEventHandler{}
+ h.OnEvent(mongoAssign())
+ assert.NotNil(t, discovery.MicroService{})
+ })
+ t.Run("when there is no such a service in database", func(t *testing.T)
{
+ h := InstanceEventHandler{}
+ h.OnEvent(mongoEventWronServiceId())
+ assert.Error(t, assert.AnError, "get from db failed")
+ })
+ t.Run("OnEvent test when syncer notify center closed", func(t
*testing.T) {
+ h := InstanceEventHandler{}
+ h.OnEvent(mongoAssign())
+ assert.Error(t, assert.AnError)
+ })
+ t.Run("OnEvent test when syncer notify center open", func(t *testing.T)
{
+ defer monkey.UnpatchAll()
+
monkey.PatchInstanceMethod(reflect.TypeOf((*syncernotify.Service)(nil)),
"Closed",
+ func(service *syncernotify.Service) bool {
+ return false
+ })
+ h := InstanceEventHandler{}
+ h.OnEvent(mongoAssign())
+ assert.Equal(t, false, t.Failed(), "add event succeed")
+ })
+}
+
+func TestNotifySyncerInstanceEvent(t *testing.T) {
+ t.Run("test when data is ok", func(t *testing.T) {
+ mongoEvent := mongoAssign()
+ microService := getMicroService()
+ NotifySyncerInstanceEvent(mongoEvent, microService)
+ assert.Equal(t, false, t.Failed())
+ })
+}
+
+func mongoAssign() sd.MongoEvent {
+ sd.Store().Service().Cache()
+ endPoints := []string{"127.0.0.1:27017"}
+ instance := discovery.MicroServiceInstance{
+ InstanceId: "f73dceb440f711eba63ffa163e7cdcb8",
+ ServiceId: "2a20507274fc71c925d138341517dce14b600744",
+ Endpoints: endPoints,
+ }
+ mongoInstance := sd.Instance{}
+ mongoInstance.InstanceInfo = &instance
+ mongoInstance.Domain = "default"
+ mongoInstance.Project = "default"
+ mongoEvent := sd.MongoEvent{}
+ mongoEvent.DocumentID = "5fdc483b4a885f69317e3505"
+ mongoEvent.Value = mongoInstance
+ mongoEvent.Type = discovery.EVT_CREATE
+ mongoEvent.ResourceID = "f73dceb440f711eba63ffa163e7cdcb8"
+ return mongoEvent
+}
+
+func mongoEventWronServiceId() sd.MongoEvent {
+ sd.Store().Service().Cache()
+ endPoints := []string{"127.0.0.1:27017"}
+ instance := discovery.MicroServiceInstance{
+ InstanceId: "f73dceb440f711eba63ffa163e7cdcb8",
+ ServiceId: "2a20507274fc71c925d138341517dce14b6007443333",
+ Endpoints: endPoints,
+ }
+ mongoInstance := sd.Instance{}
+ mongoInstance.InstanceInfo = &instance
+ mongoInstance.Domain = "default"
+ mongoInstance.Project = "default"
+ mongoEvent := sd.MongoEvent{}
+ mongoEvent.DocumentID = "5fdc483b4a885f69317e3505"
+ mongoEvent.Value = mongoInstance
+ mongoEvent.Type = discovery.EVT_CREATE
+ mongoEvent.ResourceID = "f73dceb440f711eba63ffa163e7cdcb8"
+ return mongoEvent
+}
+
+func getMicroService() *discovery.MicroService {
+ microService := discovery.MicroService{
+ ServiceId: "1efe8be8eacce1efbf67967978133572fb8b5667",
+ AppId: "default",
+ ServiceName: "ProviderDemoService1-2",
+ Version: "1.0.0",
+ Level: "BACK",
+ Status: "UP",
+ Timestamp: "1608260891",
+ ModTimestamp: "1608260891",
+ }
+ return µService
+}