nkaje commented on a change in pull request #162:
URL: https://github.com/apache/mynewt-newtmgr/pull/162#discussion_r433237045



##########
File path: nmxact/xact/image.go
##########
@@ -185,33 +209,189 @@ func nextImageUploadReq(s sesn.Sesn, upgrade bool, data 
[]byte, off int, imageNu
        return r, nil
 }
 
+func (t *ImageUploadIntTracker) UpdateTracker(off int, status int) {
+       if status == IMAGE_UPLOAD_STATUS_MISSED {
+               /* Upon error, set the value to missed for retransmission */
+               t.RspMap[off] = IMAGE_UPLOAD_CHUNK_MISSED_WM
+       } else if status == IMAGE_UPLOAD_STATUS_EXPECTED {
+               /* When the chunk at a certain offset is transmitted,
+                  a response requesting the next offset is expected. This
+                  indicates that the chunk is successfully trasmitted. Wait
+                  on the chunk in response e.g when offset 0, len 100 is sent,
+                  expected offset in the ack is 100 etc. */
+               t.RspMap[off] = 1
+       } else if status == IMAGE_UPLOAD_STATUS_RQ {
+               /* If the chunk at this offset was already transmitted, value
+                  goes to zero and that KV pair gets cleaned up subsequently.
+                  If there is a repeated request for a certain offset,
+                  that offset is not received by the remote side. Decrement
+                  the value. Missed chunk processing routine retransmits it */
+               t.RspMap[off] -= 1
+       }
+}
+
+func (t *ImageUploadIntTracker) CheckWindow() bool {
+       t.Mutex.Lock()
+       defer t.Mutex.Unlock()
+
+       return t.CheckWindowUL()
+}
+
+// Unlocked version, when the mutex is already held
+func (t *ImageUploadIntTracker) CheckWindowUL() bool {
+       return t.WCount < t.WCap
+}
+
+func (t *ImageUploadIntTracker) ProcessMissedChunks() {
+       for o, c := range t.RspMap {
+               if c < -(IMAGE_UPLOAD_MAX_WS * 2) {
+                       delete(t.RspMap, o)
+                       t.Off = o
+                       log.Debugf("missed? off %d count %d", o, c)
+               }
+               // clean up done chunks
+               if c == 0 {
+                       delete(t.RspMap, o)
+               }
+       }
+}
+
+func (t *ImageUploadIntTracker) HandleResponse(c *ImageUploadCmd, off int, rsp 
nmp.NmpRsp, res *ImageUploadResult) bool {
+       var cmp int64
+       cmp = 1
+       wFull := false
+
+       irsp := rsp.(*nmp.ImageUploadRsp)
+       res.Rsps = append(res.Rsps, irsp)
+       t.UpdateTracker(int(irsp.Off), IMAGE_UPLOAD_STATUS_RQ)
+
+       if t.WCap == t.WCount {
+               wFull = true
+       }
+       if t.MaxRxOff < int64(irsp.Off) {
+               t.MaxRxOff = int64(irsp.Off)
+       }
+       if c.ProgressCb != nil {
+               c.ProgressCb(c, irsp)
+       }
+       if t.TuneWS && t.WCap < IMAGE_UPLOAD_MAX_WS {
+               atomic.AddInt64(&t.WCap, 1)
+               cmp += 1
+       }
+       atomic.AddInt64(&t.WCount, -1)
+
+       // Indicate transition from window being full to with open slot(s)
+       if wFull && (t.WCap-t.WCount == cmp) {
+               return true
+       } else {
+               return false
+       }
+}
+
+func (t *ImageUploadIntTracker) HandleError(off int, err error) bool {
+       /*XXX: there could be an Unauthorize or EOF error  when the rate is too 
high
+         due to a large window, we retry. example:
+         "failed to decrypt message: coap_sec_tunnel: decode GCM fail EOF"
+         Since the error is sent with fmt.Errorf() API, with no code,
+         the string may have to be parsed to know the particular error */
+       var wFull = false
+       if t.WCap == t.WCount {
+               wFull = true
+       }
+
+       if t.WCount > IMAGE_UPLOAD_START_WS+1 {
+               atomic.AddInt64(&t.WCap, -1)
+       }
+       t.TuneWS = false
+       atomic.AddInt64(&t.WCount, -1)
+       t.UpdateTracker(off, IMAGE_UPLOAD_STATUS_MISSED)
+
+       // Indicate transition from window being full to with open slot(s)
+       if wFull && t.WCount == t.WCap-1 {
+               return true
+       } else {
+               return false
+       }
+}
+
 func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
        res := newImageUploadResult()
+       ch := make(chan int)
+       done := make(chan int)
+       rspc := make(chan nmp.NmpRsp, IMAGE_UPLOAD_MAX_WS)
+       errc := make(chan error, IMAGE_UPLOAD_MAX_WS)
+
+       t := ImageUploadIntTracker{
+               TuneWS:   true,
+               WCount:   0,
+               WCap:     IMAGE_UPLOAD_START_WS,
+               Off:      c.StartOff,
+               RspMap:   make(map[int]int),
+               MaxRxOff: 0,
+       }
+
+       for int(atomic.LoadInt64(&t.MaxRxOff)) < len(c.Data) {
+               // Block if window is full
+               if !t.CheckWindow() {
+                       ch <- 1
+               }
 
-       for off := c.StartOff; off < len(c.Data); {
-               r, err := nextImageUploadReq(s, c.Upgrade, c.Data, off, 
c.ImageNum)
-               if err != nil {
-                       return nil, err
+               t.Mutex.Lock()
+               t.ProcessMissedChunks()
+               t.Mutex.Unlock()
+
+               if t.Off == len(c.Data) {
+                       continue
                }
 
-               rsp, err := txReq(s, r.Msg(), &c.CmdBase)
+               t.Mutex.Lock()
+               r, err := nextImageUploadReq(s, c.Upgrade, c.Data, t.Off, 
c.ImageNum)
                if err != nil {
+                       t.Mutex.Unlock()
                        return nil, err
                }
-               irsp := rsp.(*nmp.ImageUploadRsp)
 
-               off = int(irsp.Off)
-
-               if c.ProgressCb != nil {
-                       c.ProgressCb(c, irsp)
-               }
+               t.Off = (int(r.Off) + len(r.Data))
 
-               res.Rsps = append(res.Rsps, irsp)
-               if irsp.Rc != 0 {
+               // Use up a chunk in window
+               atomic.AddInt64(&t.WCount, 1)
+               err = txReqAsync(s, r.Msg(), &c.CmdBase, rspc, errc)
+               if err != nil {
+                       log.Debugf("err txReqAsync %v", err)
+                       t.Mutex.Unlock()
                        break

Review comment:
       Removed `done`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to