nkaje commented on a change in pull request #162:
URL: https://github.com/apache/mynewt-newtmgr/pull/162#discussion_r426317257



##########
File path: nmxact/xact/image.go
##########
@@ -185,33 +207,163 @@ func nextImageUploadReq(s sesn.Sesn, upgrade bool, data 
[]byte, off int, imageNu
        return r, nil
 }
 
+func (t *ImageUploadIntTracker) UpdateTracker(t_off int, r_off int, status 
int) {
+       if status == IMAGE_UPLOAD_STATUS_MISSED {
+               t.RspMap[t_off] = IMAGE_UPLOAD_CHUNK_MISSED_WM
+               return
+       }
+
+       if _, ok := t.RspMap[t_off]; ok {
+               // send chunk@offset, increase count
+               t.RspMap[t_off] += 1
+       } else {
+               // send chunk@offset for the first time
+               t.RspMap[t_off] = 1
+       }
+
+       if _, ok := t.RspMap[r_off]; ok {
+               // chunk@offset requested, in transit
+               t.RspMap[r_off] -= 1
+       } else {
+               // chunk@offset was not sent
+               t.RspMap[r_off] = -1
+       }
+
+}
+
+func (t *ImageUploadIntTracker) CheckWindow() bool {
+       t.Mutex.Lock()
+       defer t.Mutex.Unlock()
+
+       return t.CheckWindowUL()
+}
+
+// Unlocked version, when the mutex is already held
+func (t *ImageUploadIntTracker) CheckWindowUL() bool {
+       return t.WCount < t.WCap
+}
+
+func (t *ImageUploadIntTracker) process_missed_chunks() {
+       for o, c := range t.RspMap {
+               if c < -(IMAGE_UPLOAD_MAX_WS * 2) {
+                       delete(t.RspMap, o)
+                       t.Off = o
+                       log.Debugf("missed? off %d count %d", o, c)
+               }
+               // clean up done chunks
+               if c == 0 {
+                       delete(t.RspMap, o)
+               }
+       }
+}
+
+func (t *ImageUploadIntTracker) HandleResponse(c *ImageUploadCmd, off int, rsp 
nmp.NmpRsp, res *ImageUploadResult) {
+       irsp := rsp.(*nmp.ImageUploadRsp)
+       res.Rsps = append(res.Rsps, irsp)
+       t.UpdateTracker(off, int(irsp.Off), 0)
+       if t.MaxRxOff < int64(irsp.Off) {
+               t.MaxRxOff = int64(irsp.Off)
+       }
+       if c.ProgressCb != nil {
+               c.ProgressCb(c, irsp)
+       }
+       if t.TuneWS && t.WCap < IMAGE_UPLOAD_MAX_WS {
+               atomic.AddInt64(&t.WCap, 1)
+       }
+       atomic.AddInt64(&t.WCount, -1)
+}
+
+func (t *ImageUploadIntTracker) HandleError(off int, err error) {
+       /*XXX: there could be an Unauthorize or EOF error  when the rate is too 
high
+         due to a large window example:
+         "failed to decrypt message: coap_sec_tunnel: decode GCM fail EOF"
+         Since the error is sent with fmt.Errorf() API, with no code,
+         the string may have to be parsed to know the particular error */
+       if t.WCount > IMAGE_UPLOAD_START_WS+1 {
+               atomic.AddInt64(&t.WCap, -1)
+       }
+       t.TuneWS = false
+       atomic.AddInt64(&t.WCount, -1)
+       t.UpdateTracker(off, -1, IMAGE_UPLOAD_STATUS_MISSED)
+}
+
 func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
        res := newImageUploadResult()
+       done := make(chan int)
+       rsp_c := make(chan nmp.NmpRsp, IMAGE_UPLOAD_MAX_WS)
+       err_c := make(chan error, IMAGE_UPLOAD_MAX_WS)
 
-       for off := c.StartOff; off < len(c.Data); {
-               r, err := nextImageUploadReq(s, c.Upgrade, c.Data, off, 
c.ImageNum)
-               if err != nil {
-                       return nil, err
+       t := ImageUploadIntTracker{
+               TuneWS:   true,
+               WCount:   0,
+               WCap:     IMAGE_UPLOAD_START_WS,
+               Off:      c.StartOff,
+               RspMap:   make(map[int]int),
+               MaxRxOff: 0,
+       }
+
+       var wg sync.WaitGroup
+
+       for int(atomic.LoadInt64(&t.MaxRxOff)) < len(c.Data) {
+               // Block if window is full
+               for !t.CheckWindow() {
                }
 
-               rsp, err := txReq(s, r.Msg(), &c.CmdBase)
-               if err != nil {
-                       return nil, err
+               t.Mutex.Lock()
+
+               t.process_missed_chunks()
+
+               if t.Off == len(c.Data) {
+                       t.Mutex.Unlock()
+                       break
                }
-               irsp := rsp.(*nmp.ImageUploadRsp)
 
-               off = int(irsp.Off)
+               if int(t.MaxRxOff) == len(c.Data) {
+                       t.Mutex.Unlock()
+                       break
+               }
 
-               if c.ProgressCb != nil {
-                       c.ProgressCb(c, irsp)
+               r, err := nextImageUploadReq(s, c.Upgrade, c.Data, t.Off, 
c.ImageNum)
+               t.Mutex.Unlock()
+               if err != nil {
+                       return nil, err
                }
 
-               res.Rsps = append(res.Rsps, irsp)
-               if irsp.Rc != 0 {
+               t.Off = (int(r.Off) + len(r.Data))
+
+               // Use up a chunk in window
+               atomic.AddInt64(&t.WCount, 1)
+               err = txReq_async(s, r.Msg(), &c.CmdBase, rsp_c, err_c)
+               if err != nil {
+                       log.Debugf("err txReq_async %v", err)
                        break
                }
+
+               wg.Add(1)
+               go func(off int) {
+                       select {
+                       case err := <-err_c:
+                               t.Mutex.Lock()
+                               t.HandleError(off, err)
+                               t.Mutex.Unlock()
+                               wg.Done()

Review comment:
       wg.Done() can be at the beginning by making use of `defer`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to