This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 42a17b7 SCB-2094 Implement of Mongo heartbeat interface (#748)
42a17b7 is described below
commit 42a17b7b72f0e2e79c3fa05e6be567896a486fce
Author: robotLJW <[email protected]>
AuthorDate: Thu Nov 19 18:40:30 2020 +0800
SCB-2094 Implement of Mongo heartbeat interface (#748)
---
datasource/mongo/account.go | 12 ++--
datasource/mongo/account_test.go | 2 +-
datasource/mongo/database.go | 6 +-
.../{heartbeatchecker.go => heartbeat.go} | 37 ++++++-----
.../heartbeat/heartbeatchecker/heartbeat_test.go | 75 ++++++++++++++++++++++
.../heartbeat/heartbeatchecker/heartbeatchecker.go | 18 +++++-
.../heartbeatchecker/heartbeatchecker_test.go | 65 +++++++++++++++++++
7 files changed, 191 insertions(+), 24 deletions(-)
diff --git a/datasource/mongo/account.go b/datasource/mongo/account.go
index 6f96d69..d5f2bd9 100644
--- a/datasource/mongo/account.go
+++ b/datasource/mongo/account.go
@@ -47,7 +47,7 @@ func (ds *DataSource) CreateAccount(ctx context.Context, a
*rbacframe.Account) e
}
a.Password = stringutil.Bytes2str(hash)
a.ID = util.GenerateUUID()
- _, err = client.GetMongoClient().Insert(ctx, Account, a)
+ _, err = client.GetMongoClient().Insert(ctx, CollectionAccount, a)
if err != nil {
switch tt := err.(type) {
case mongo.WriteException:
@@ -71,7 +71,7 @@ func (ds *DataSource) AccountExist(ctx context.Context, key
string) (bool, error
filter := bson.M{
AccountName: key,
}
- count, err := client.GetMongoClient().Count(ctx, Account, filter)
+ count, err := client.GetMongoClient().Count(ctx, CollectionAccount,
filter)
if err != nil {
return false, err
}
@@ -85,7 +85,7 @@ func (ds *DataSource) GetAccount(ctx context.Context, key
string) (*rbacframe.Ac
filter := bson.M{
AccountName: key,
}
- result, err := client.GetMongoClient().FindOne(ctx, Account, filter)
+ result, err := client.GetMongoClient().FindOne(ctx, CollectionAccount,
filter)
if err != nil {
return nil, err
}
@@ -105,7 +105,7 @@ func (ds *DataSource) ListAccount(ctx context.Context, key
string) ([]*rbacframe
filter := bson.M{
AccountName: bson.M{"$regex": key},
}
- cursor, err := client.GetMongoClient().Find(ctx, Account, filter)
+ cursor, err := client.GetMongoClient().Find(ctx, CollectionAccount,
filter)
if err != nil {
return nil, 0, err
}
@@ -127,7 +127,7 @@ func (ds *DataSource) DeleteAccount(ctx context.Context,
key string) (bool, erro
filter := bson.M{
AccountName: key,
}
- result, err := client.GetMongoClient().Delete(ctx, Account, filter)
+ result, err := client.GetMongoClient().Delete(ctx, CollectionAccount,
filter)
if err != nil {
return false, err
}
@@ -146,7 +146,7 @@ func (ds *DataSource) UpdateAccount(ctx context.Context,
key string, account *rb
AccountCurrentPassword: account.CurrentPassword,
AccountStatus: account.Status,
},
}
- result, err := client.GetMongoClient().Update(ctx, Account, filter,
update)
+ result, err := client.GetMongoClient().Update(ctx, CollectionAccount,
filter, update)
if err != nil {
return err
}
diff --git a/datasource/mongo/account_test.go b/datasource/mongo/account_test.go
index 7ba1edd..5f7d8a0 100644
--- a/datasource/mongo/account_test.go
+++ b/datasource/mongo/account_test.go
@@ -34,7 +34,7 @@ func init() {
config := storage.DB{
URI: "mongodb://localhost:27017",
}
- client.NewMongoClient(config, []string{mongo.Account})
+ client.NewMongoClient(config, []string{mongo.CollectionAccount})
instance, _ = mongo.NewDataSource(datasource.Options{})
}
diff --git a/datasource/mongo/database.go b/datasource/mongo/database.go
index 0635fa4..67f9e4a 100644
--- a/datasource/mongo/database.go
+++ b/datasource/mongo/database.go
@@ -24,7 +24,7 @@ import (
const (
DuplicateKey = 11000
- Account = "account"
+ CollectionAccount = "account"
AccountName = "name"
AccountID = "id"
AccountPassword = "password"
@@ -32,6 +32,10 @@ const (
AccountTokenExpirationTime = "tokenexpirationtime"
AccountCurrentPassword = "currentpassword"
AccountStatus = "status"
+ CollectionInstance = "instance"
+ InstanceID = "instanceinfo.instanceid"
+ ServiceID = "instanceinfo.serviceid"
+ RefreshTime = "refreshtime"
)
type Instance struct {
diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go
similarity index 57%
copy from datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
copy to datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go
index cefbf9b..e7f389d 100644
--- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
+++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go
@@ -19,21 +19,28 @@ package heartbeatchecker
import (
"context"
-
"github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat"
- pb "github.com/apache/servicecomb-service-center/pkg/registry"
+ "github.com/apache/servicecomb-service-center/datasource/mongo"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "go.mongodb.org/mongo-driver/bson"
+ "time"
)
-func init() {
- heartbeat.Install("heartbeatchecker", NewHeartBeatChecker)
-}
-
-type HeartBeatChecker struct {
-}
-
-func NewHeartBeatChecker(opts heartbeat.Options) (heartbeat.HealthCheck,
error) {
- return &HeartBeatChecker{}, nil
-}
-
-func (h *HeartBeatChecker) Heartbeat(ctx context.Context, request
*pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
- return nil, nil
+func updateInstanceRefreshTime(ctx context.Context, serviceID string,
instanceID string) error {
+ filter := bson.M{
+ mongo.InstanceID: instanceID,
+ mongo.ServiceID: serviceID,
+ }
+ update := bson.M{
+ "$set": bson.M{mongo.RefreshTime: time.Now()},
+ }
+ result, err := client.GetMongoClient().FindOneAndUpdate(ctx,
mongo.CollectionInstance, filter, update)
+ if err != nil {
+ log.Errorf(err, "failed to update refresh time of instance: ")
+ return err
+ }
+ if result.Err() != nil {
+ return result.Err()
+ }
+ return nil
}
diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go
b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go
new file mode 100644
index 0000000..1bc3a95
--- /dev/null
+++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package heartbeatchecker
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/servicecomb-service-center/datasource/mongo"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+ pb "github.com/apache/servicecomb-service-center/pkg/registry"
+ "github.com/go-chassis/go-chassis/v2/storage"
+ "github.com/stretchr/testify/assert"
+ "go.mongodb.org/mongo-driver/bson"
+ "testing"
+ "time"
+)
+
+func init() {
+ config := storage.DB{
+ URI: "mongodb://localhost:27017",
+ }
+ client.NewMongoClient(config, []string{mongo.CollectionInstance})
+}
+
+func TestUpdateInstanceRefreshTime(t *testing.T) {
+ t.Run("update instance refresh time: if the instance does not exist,the
update should fail", func(t *testing.T) {
+ err := updateInstanceRefreshTime(context.Background(),
"not-exist", "not-exist")
+ fmt.Println(err)
+ assert.NotNil(t, err)
+ })
+
+ t.Run("update instance refresh time: if the instance does exist,the
update should succeed", func(t *testing.T) {
+ instance1 := mongo.Instance{
+ RefreshTime: time.Now(),
+ InstanceInfo: &pb.MicroServiceInstance{
+ InstanceId: "instanceId1",
+ ServiceId: "serviceId1",
+ },
+ }
+ _, err := client.GetMongoClient().Insert(context.Background(),
mongo.CollectionInstance, instance1)
+ assert.Equal(t, nil, err)
+ err = updateInstanceRefreshTime(context.Background(),
instance1.InstanceInfo.ServiceId, instance1.InstanceInfo.InstanceId)
+ assert.Equal(t, nil, err)
+ filter := bson.M{
+ mongo.InstanceID: instance1.InstanceInfo.InstanceId,
+ mongo.ServiceID: instance1.InstanceInfo.ServiceId,
+ }
+ result, err :=
client.GetMongoClient().FindOne(context.Background(), mongo.CollectionInstance,
filter)
+ assert.Nil(t, err)
+ var ins mongo.Instance
+ err = result.Decode(&ins)
+ assert.Nil(t, err)
+ assert.NotEqual(t, instance1.RefreshTime, ins.RefreshTime)
+ filter = bson.M{
+ mongo.InstanceID: instance1.InstanceInfo.InstanceId,
+ }
+ _, err = client.GetMongoClient().Delete(context.Background(),
mongo.CollectionInstance, filter)
+ assert.Nil(t, err)
+ })
+}
diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
index cefbf9b..96c96e6 100644
--- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
+++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
@@ -20,7 +20,10 @@ package heartbeatchecker
import (
"context"
"github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat"
+ "github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/apache/servicecomb-service-center/pkg/registry"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ scerr "github.com/apache/servicecomb-service-center/server/scerror"
)
func init() {
@@ -35,5 +38,18 @@ func NewHeartBeatChecker(opts heartbeat.Options)
(heartbeat.HealthCheck, error)
}
func (h *HeartBeatChecker) Heartbeat(ctx context.Context, request
*pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
- return nil, nil
+ remoteIP := util.GetIPFromContext(ctx)
+ err := updateInstanceRefreshTime(ctx, request.ServiceId,
request.InstanceId)
+ if err != nil {
+ log.Errorf(err, "heartbeat failed, instance[%s]. operator %s",
+ request.InstanceId, remoteIP)
+ resp := &pb.HeartbeatResponse{
+ Response:
pb.CreateResponseWithSCErr(scerr.NewError(scerr.ErrInstanceNotExists,
err.Error())),
+ }
+ return resp, err
+ }
+ return &pb.HeartbeatResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess,
+ "Update service instance heartbeat successfully."),
+ }, nil
}
diff --git
a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go
b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go
new file mode 100644
index 0000000..18a9556
--- /dev/null
+++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package heartbeatchecker
+
+import (
+ "context"
+ "github.com/apache/servicecomb-service-center/datasource/mongo"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+ pb "github.com/apache/servicecomb-service-center/pkg/registry"
+ "github.com/stretchr/testify/assert"
+ "go.mongodb.org/mongo-driver/bson"
+ "testing"
+ "time"
+)
+
+func TestHeartbeat(t *testing.T) {
+ t.Run("heartbeat: if the instance does not exist,the heartbeat should
fail", func(t *testing.T) {
+ heartBeatChecker := &HeartBeatChecker{}
+ resp, err := heartBeatChecker.Heartbeat(context.Background(),
&pb.HeartbeatRequest{
+ ServiceId: "not-exist-ins",
+ InstanceId: "not-exist-ins",
+ })
+ assert.NotNil(t, err)
+ assert.NotEqual(t, pb.ResponseSuccess, resp.Response.GetCode())
+ })
+
+ t.Run("heartbeat: if the instance does exist,the heartbeat should
succeed", func(t *testing.T) {
+ instance1 := mongo.Instance{
+ RefreshTime: time.Now(),
+ InstanceInfo: &pb.MicroServiceInstance{
+ InstanceId: "instanceId1",
+ ServiceId: "serviceId1",
+ },
+ }
+ _, err := client.GetMongoClient().Insert(context.Background(),
mongo.CollectionInstance, instance1)
+ assert.Equal(t, nil, err)
+ heartBeatChecker := &HeartBeatChecker{}
+ resp, err := heartBeatChecker.Heartbeat(context.Background(),
&pb.HeartbeatRequest{
+ ServiceId: instance1.InstanceInfo.ServiceId,
+ InstanceId: instance1.InstanceInfo.InstanceId,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ filter := bson.M{
+ mongo.InstanceID: instance1.InstanceInfo.InstanceId,
+ }
+ _, err = client.GetMongoClient().Delete(context.Background(),
mongo.CollectionInstance, filter)
+ assert.Nil(t, err)
+ })
+}