This is an automated email from the ASF dual-hosted git repository.

mabin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 78c431e  [SCB-1429] Abstract storage operation
     new 606c91c  Merge pull request #571 from ChinX/syncer
78c431e is described below

commit 78c431ee0b3c19338bf370fb426e26620ce0361d
Author: chinx <[email protected]>
AuthorDate: Fri Aug 9 15:18:09 2019 +0800

    [SCB-1429] Abstract storage operation
---
 syncer/servicecenter/storage/operation.go | 72 ++++++++++++++++++++++++++++
 syncer/servicecenter/storage/storage.go   | 78 +++++++++++++++----------------
 2 files changed, 110 insertions(+), 40 deletions(-)

diff --git a/syncer/servicecenter/storage/operation.go 
b/syncer/servicecenter/storage/operation.go
new file mode 100644
index 0000000..f0325a8
--- /dev/null
+++ b/syncer/servicecenter/storage/operation.go
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package storage
+
+import (
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/coreos/etcd/clientv3"
+)
+
+var (
+       // mappingsKey the key of instances mapping in etcd
+       mappingsKey = "/syncer/v1/mappings"
+       // servicesKey the key of service in etcd
+       servicesKey = "/syncer/v1/services"
+       // instancesKey the key of instance in etcd
+       instancesKey = "/syncer/v1/instances"
+)
+
+func putServiceOp(serviceId string, data []byte) clientv3.Op {
+       return clientv3.OpPut(servicesKey+"/"+serviceId, 
util.BytesToStringWithNoCopy(data))
+}
+
+func getServicesOp() clientv3.Op {
+       return clientv3.OpGet(servicesKey, clientv3.WithPrefix())
+}
+
+func deleteServiceOp(serviceId string) clientv3.Op {
+       return clientv3.OpDelete(servicesKey + "/" + serviceId)
+}
+
+func putInstanceOp(instanceID string, data []byte) clientv3.Op {
+       return clientv3.OpPut(instancesKey+"/"+instanceID, 
util.BytesToStringWithNoCopy(data))
+}
+
+func getInstancesOp() clientv3.Op {
+       return clientv3.OpGet(instancesKey, clientv3.WithPrefix())
+}
+
+func deleteInstanceOp(instanceID string) clientv3.Op {
+       return clientv3.OpDelete(instancesKey + "/" + instanceID)
+}
+
+func putMappingOp(cluster, mappingID string, data []byte) clientv3.Op {
+       return clientv3.OpPut(mappingsKey+"/"+cluster+"/"+mappingID, 
util.BytesToStringWithNoCopy(data))
+}
+
+func getClusterMappingsOp(cluster string) clientv3.Op {
+       return clientv3.OpGet(mappingsKey+"/"+cluster, clientv3.WithPrefix())
+}
+
+func getAllMappingsOp() clientv3.Op {
+       return clientv3.OpGet(mappingsKey, clientv3.WithPrefix())
+}
+
+func delMappingOp(cluster, mappingID string) clientv3.Op {
+       return clientv3.OpDelete(mappingsKey + "/" + cluster + "/" + mappingID)
+}
\ No newline at end of file
diff --git a/syncer/servicecenter/storage/storage.go 
b/syncer/servicecenter/storage/storage.go
index c6abf44..14bf91b 100644
--- a/syncer/servicecenter/storage/storage.go
+++ b/syncer/servicecenter/storage/storage.go
@@ -22,21 +22,11 @@ import (
        "sync"
 
        "github.com/apache/servicecomb-service-center/pkg/log"
-       "github.com/apache/servicecomb-service-center/pkg/util"
        pb "github.com/apache/servicecomb-service-center/syncer/proto"
        "github.com/coreos/etcd/clientv3"
        "github.com/gogo/protobuf/proto"
 )
 
-var (
-       // mappingsKey the key of instances mapping in etcd
-       mappingsKey = "/syncer/v1/mappings"
-       // servicesKey the key of service in etcd
-       servicesKey = "/syncer/v1/services"
-       // instancesKey the key of instance in etcd
-       instancesKey = "/syncer/v1/instances"
-)
-
 type Storage interface {
        GetData() (data *pb.SyncData)
        UpdateData(data *pb.SyncData)
@@ -60,15 +50,21 @@ func NewStorage(engine clientv3.KV) Storage {
        return storage
 }
 
-// getPrefixKey Get data from etcd based on the prefix key
-func (s *storage) getPrefixKey(prefix string, handler func(key, val []byte) 
(next bool)) {
-       resp, err := s.engine.Get(context.Background(), prefix, 
clientv3.WithPrefix())
+// getValue Get value from etcd by key
+func (s *storage) getValue(opt clientv3.Op, handler func(key, val []byte) 
(next bool)) {
+       resp, err := s.engine.Do(context.Background(), opt)
        if err != nil {
-               log.Errorf(err, "Get mapping from etcd failed: %s", err)
+               log.Errorf(err, "Do etcd operation failed: %s", err)
                return
        }
 
-       for _, kv := range resp.Kvs {
+       getResp := resp.Get()
+       if getResp == nil {
+               log.Error("Data from etcd is empty", nil)
+               return
+       }
+
+       for _, kv := range resp.Get().Kvs {
                if !handler(kv.Key, kv.Value) {
                        break
                }
@@ -132,8 +128,9 @@ next:
                                continue next
                        }
                }
-               key := mappingsKey + "/" + entry.ClusterName + "/" + 
entry.OrgInstanceID
-               if _, err := s.engine.Delete(context.Background(), key); err != 
nil {
+
+               delOp := delMappingOp(entry.ClusterName, entry.OrgInstanceID)
+               if _, err := s.engine.Do(context.Background(),delOp); err != 
nil {
                        log.Errorf(err, "Delete instance clusterName=%s 
instanceID=%s failed", entry.ClusterName, entry.OrgInstanceID)
                }
 
@@ -144,13 +141,14 @@ next:
 // UpdateServices Update services to storage
 func (s *storage) UpdateServices(services []*pb.SyncService) {
        for _, val := range services {
-               key := servicesKey + "/" + val.ServiceId
                data, err := proto.Marshal(val)
                if err != nil {
                        log.Errorf(err, "Proto marshal failed: %s", err)
                        continue
                }
-               _, err = s.engine.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
+
+               updateOp := putServiceOp(val.ServiceId, data)
+               _, err = s.engine.Do(context.Background(), updateOp)
                if err != nil {
                        log.Errorf(err, "Save service to etcd failed: %s", err)
                }
@@ -160,7 +158,7 @@ func (s *storage) UpdateServices(services 
[]*pb.SyncService) {
 // GetServices Get services from storage
 func (s *storage) GetServices() (services []*pb.SyncService) {
        services = make([]*pb.SyncService, 0, 10)
-       s.getPrefixKey(servicesKey, func(key, val []byte) (next bool) {
+       s.getValue(getServicesOp(), func(key, val []byte) (next bool) {
                next = true
                item := &pb.SyncService{}
                if err := proto.Unmarshal(val, item); err != nil {
@@ -182,8 +180,8 @@ func (s *storage) DeleteServices(services 
[]*pb.SyncService) {
 
 // DeleteServices Delete services from storage
 func (s *storage) deleteService(serviceId string) {
-       key := servicesKey + "/" + serviceId
-       _, err := s.engine.Delete(context.Background(), key)
+       delOp := deleteServiceOp(serviceId)
+       _, err := s.engine.Do(context.Background(), delOp)
        if err != nil {
                log.Errorf(err, "Delete service from etcd failed: %s", err)
        }
@@ -192,13 +190,14 @@ func (s *storage) deleteService(serviceId string) {
 // UpdateInstances Update instances to storage
 func (s *storage) UpdateInstances(instances []*pb.SyncInstance) {
        for _, val := range instances {
-               key := instancesKey + "/" + val.InstanceId
                data, err := proto.Marshal(val)
                if err != nil {
                        log.Errorf(err, "Proto marshal failed: %s", err)
                        continue
                }
-               _, err = s.engine.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
+
+               updateOp := putInstanceOp(val.InstanceId, data)
+               _, err = s.engine.Do(context.Background(), updateOp)
                if err != nil {
                        log.Errorf(err, "Save instance to etcd failed: %s", err)
                }
@@ -208,7 +207,7 @@ func (s *storage) UpdateInstances(instances 
[]*pb.SyncInstance) {
 // GetInstances Get instances from storage
 func (s *storage) GetInstances() (instances []*pb.SyncInstance) {
        instances = make([]*pb.SyncInstance, 0, 10)
-       s.getPrefixKey(instancesKey, func(key, val []byte) (next bool) {
+       s.getValue(getInstancesOp(), func(key, val []byte) (next bool) {
                next = true
                item := &pb.SyncInstance{}
                if err := proto.Unmarshal(val, item); err != nil {
@@ -229,8 +228,8 @@ func (s *storage) DeleteInstances(instances 
[]*pb.SyncInstance) {
 }
 
 func (s *storage) deleteInstance(instanceID string) {
-       key := instancesKey + "/" + instanceID
-       _, err := s.engine.Delete(context.Background(), key)
+       delOp := deleteInstanceOp(instanceID)
+       _, err := s.engine.Do(context.Background(), delOp)
        if err != nil {
                log.Errorf(err, "Delete instance from etcd failed: %s", err)
        }
@@ -240,13 +239,14 @@ func (s *storage) deleteInstance(instanceID string) {
 func (s *storage) UpdateMapByCluster(clusterName string, mapping 
pb.SyncMapping) {
        newMaps := make(pb.SyncMapping, 0, len(mapping))
        for _, val := range mapping {
-               key := mappingsKey + "/" + clusterName + "/" + val.OrgInstanceID
                data, err := proto.Marshal(val)
                if err != nil {
                        log.Errorf(err, "Proto marshal failed: %s", err)
                        continue
                }
-               _, err = s.engine.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
+
+               putOp := putMappingOp(clusterName, val.OrgInstanceID, data)
+               _, err = s.engine.Do(context.Background(), putOp)
                if err != nil {
                        log.Errorf(err, "Save mapping to etcd failed: %s", err)
                }
@@ -257,19 +257,17 @@ func (s *storage) UpdateMapByCluster(clusterName string, 
mapping pb.SyncMapping)
 
 // GetMapByCluster get map by clusterName of other cluster
 func (s *storage) GetMapByCluster(clusterName string) (mapping pb.SyncMapping) 
{
-       maps := make(pb.SyncMapping, 0, 10)
-       s.getPrefixKey(mappingsKey+"/"+clusterName, func(key, val []byte) (next 
bool) {
+       s.getValue(getClusterMappingsOp(clusterName), func(key, val []byte) 
(next bool) {
                next = true
                item := &pb.MappingEntry{}
                if err := proto.Unmarshal(val, item); err != nil {
                        log.Errorf(err, "Proto unmarshal '%s' failed: %s", val, 
err)
                        return
                }
-
-               maps = append(maps, item)
+               mapping = append(mapping, item)
                return
        })
-       return maps
+       return
 }
 
 // UpdateMaps update all maps to etcd
@@ -277,13 +275,14 @@ func (s *storage) UpdateMaps(maps pb.SyncMapping) {
        srcMaps := s.GetMaps()
        mappings := make(pb.SyncMapping, 0, len(maps))
        for _, val := range maps {
-               key := mappingsKey + "/" + val.ClusterName + "/" + 
val.OrgInstanceID
                data, err := proto.Marshal(val)
                if err != nil {
                        log.Errorf(err, "Proto marshal failed: %s", err)
                        continue
                }
-               _, err = s.engine.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
+
+               putOp := putMappingOp(val.ClusterName, val.OrgInstanceID, data)
+               _, err = s.engine.Do(context.Background(), putOp)
                if err != nil {
                        log.Errorf(err, "Save mapping to etcd failed: %s", err)
                        continue
@@ -295,16 +294,15 @@ func (s *storage) UpdateMaps(maps pb.SyncMapping) {
 
 // GetMaps Get maps from storage
 func (s *storage) GetMaps() (mapping pb.SyncMapping) {
-       maps := make(pb.SyncMapping, 0, 10)
-       s.getPrefixKey(mappingsKey, func(key, val []byte) (next bool) {
+       s.getValue(getAllMappingsOp(), func(key, val []byte) (next bool) {
                next = true
                item := &pb.MappingEntry{}
                if err := proto.Unmarshal(val, item); err != nil {
                        log.Errorf(err, "Proto unmarshal '%s' failed: %s", val, 
err)
                        return
                }
-               maps = append(maps, item)
+               mapping = append(mapping, item)
                return
        })
-       return maps
+       return
 }

Reply via email to