This is an automated email from the ASF dual-hosted git repository.

asifdxtreme pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fc039b  SCB-890 Lost changed event when bootstrap with embedded etcd 
(#435)
9fc039b is described below

commit 9fc039bda4e01adb71ae3f78ef3c9c0683228e2f
Author: little-cui <[email protected]>
AuthorDate: Tue Sep 4 19:26:22 2018 +0800

    SCB-890 Lost changed event when bootstrap with embedded etcd (#435)
---
 .../infra/registry/embededetcd/embededetcd.go      | 83 ++++++++++++----------
 1 file changed, 44 insertions(+), 39 deletions(-)

diff --git a/server/plugin/infra/registry/embededetcd/embededetcd.go 
b/server/plugin/infra/registry/embededetcd/embededetcd.go
index 20e7525..4f92853 100644
--- a/server/plugin/infra/registry/embededetcd/embededetcd.go
+++ b/server/plugin/infra/registry/embededetcd/embededetcd.go
@@ -418,35 +418,9 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts 
...registry.PluginOpOption)
                                        return
                                }
 
-                               l := len(resp.Events)
-                               kvs := make([]*mvccpb.KeyValue, l)
-                               pIdx, prevAction := 0, mvccpb.PUT
-                               pResp := &registry.PluginResponse{Action: 
registry.Put, Succeeded: true}
-
-                               for _, evt := range resp.Events {
-                                       if prevAction != evt.Type {
-                                               prevAction = evt.Type
-
-                                               if pIdx > 0 {
-                                                       err = 
setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
-                                                       if err != nil {
-                                                               return
-                                                       }
-                                                       pIdx = 0
-                                               }
-                                       }
-
-                                       pResp.Revision = evt.Kv.ModRevision
-                                       pResp.Action = 
setKvsAndConvertAction(kvs, pIdx, &evt)
-
-                                       pIdx++
-                               }
-
-                               if pIdx > 0 {
-                                       err = setResponseAndCallback(pResp, 
kvs[:pIdx], op.WatchCallback)
-                                       if err != nil {
-                                               return
-                                       }
+                               err = dispatch(resp.Events, op.WatchCallback)
+                               if err != nil {
+                                       return
                                }
                        }
                }
@@ -478,7 +452,39 @@ func (s *EtcdEmbed) readyNotify() {
        }
 }
 
-func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt 
*mvccpb.Event) registry.ActionType {
+func dispatch(evts []mvccpb.Event, cb registry.WatchCallback) error {
+       l := len(evts)
+       kvs := make([]*mvccpb.KeyValue, l)
+       sIdx, eIdx, rev := 0, 0, int64(0)
+       action, prevEvtType := registry.Put, mvccpb.PUT
+
+       for _, evt := range evts {
+               if prevEvtType != evt.Type {
+                       if eIdx > 0 {
+                               err := callback(action, rev, kvs[sIdx:eIdx], cb)
+                               if err != nil {
+                                       return err
+                               }
+                               sIdx = eIdx
+                       }
+                       prevEvtType = evt.Type
+               }
+
+               if rev < evt.Kv.ModRevision {
+                       rev = evt.Kv.ModRevision
+               }
+               action = setKvsAndConvertAction(kvs, eIdx, evt)
+
+               eIdx++
+       }
+
+       if eIdx > 0 {
+               return callback(action, rev, kvs[sIdx:eIdx], cb)
+       }
+       return nil
+}
+
+func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt 
mvccpb.Event) registry.ActionType {
        switch evt.Type {
        case mvccpb.DELETE:
                kv := evt.PrevKv
@@ -493,15 +499,14 @@ func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx 
int, evt *mvccpb.Event)
        }
 }
 
-func setResponseAndCallback(pResp *registry.PluginResponse, kvs 
[]*mvccpb.KeyValue, cb registry.WatchCallback) error {
-       pResp.Count = int64(len(kvs))
-       pResp.Kvs = kvs
-
-       err := cb("key information changed", pResp)
-       if err != nil {
-               return err
-       }
-       return nil
+func callback(action registry.ActionType, rev int64, kvs []*mvccpb.KeyValue, 
cb registry.WatchCallback) error {
+       return cb("key information changed", &registry.PluginResponse{
+               Action:    action,
+               Kvs:       kvs,
+               Count:     int64(len(kvs)),
+               Revision:  rev,
+               Succeeded: true,
+       })
 }
 
 func getEmbedInstance() mgr.PluginInstance {

Reply via email to