nmxact - Stability when xport fails

Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/db639f93
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/db639f93
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/db639f93

Branch: refs/heads/master
Commit: db639f9313a27c051c0b6dd9e13840549f3f5acd
Parents: 0384fcf
Author: Christopher Collins <[email protected]>
Authored: Fri Mar 31 18:26:35 2017 -0700
Committer: Christopher Collins <[email protected]>
Committed: Mon Apr 3 16:49:25 2017 -0700

----------------------------------------------------------------------
 nmxact/nmble/ble_act.go        | 16 +++----
 nmxact/nmble/ble_fsm.go        | 53 +++++++++++++----------
 nmxact/nmble/ble_oic_sesn.go   | 12 ++++++
 nmxact/nmble/ble_plain_sesn.go |  6 +++
 nmxact/nmble/ble_xport.go      | 85 ++++++++++++++++++++-----------------
 nmxact/nmp/dispatch.go         |  1 +
 6 files changed, 103 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_act.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go
index 2aa6678..cb6cb00 100644
--- a/nmxact/nmble/ble_act.go
+++ b/nmxact/nmble/ble_act.go
@@ -56,7 +56,7 @@ func terminate(x *BleXport, bl *BleListener, r 
*BleTerminateReq) error {
                        default:
                        }
 
-               case <-bl.AfterTimeout(x.rspTimeout):
+               case <-bl.AfterTimeout(x.RspTimeout()):
                        return BhdTimeoutError(MSG_TYPE_TERMINATE)
                }
        }
@@ -92,7 +92,7 @@ func connCancel(x *BleXport, bl *BleListener, r 
*BleConnCancelReq) error {
                        default:
                        }
 
-               case <-bl.AfterTimeout(x.rspTimeout):
+               case <-bl.AfterTimeout(x.RspTimeout()):
                        return BhdTimeoutError(MSG_TYPE_TERMINATE)
                }
        }
@@ -148,7 +148,7 @@ func discSvcUuid(x *BleXport, bl *BleListener, r 
*BleDiscSvcUuidReq) (
                        default:
                        }
 
-               case <-bl.AfterTimeout(x.rspTimeout):
+               case <-bl.AfterTimeout(x.RspTimeout()):
                        return nil, BhdTimeoutError(MSG_TYPE_DISC_SVC_UUID)
                }
        }
@@ -198,7 +198,7 @@ func discAllChrs(x *BleXport, bl *BleListener, r 
*BleDiscAllChrsReq) (
                        default:
                        }
 
-               case <-bl.AfterTimeout(x.rspTimeout):
+               case <-bl.AfterTimeout(x.RspTimeout()):
                        return nil, BhdTimeoutError(MSG_TYPE_DISC_ALL_CHRS)
                }
        }
@@ -235,7 +235,7 @@ func writeCmd(x *BleXport, bl *BleListener, r 
*BleWriteCmdReq) error {
                        default:
                        }
 
-               case <-bl.AfterTimeout(x.rspTimeout):
+               case <-bl.AfterTimeout(x.RspTimeout()):
                        return BhdTimeoutError(MSG_TYPE_WRITE_CMD)
                }
        }
@@ -281,7 +281,7 @@ func exchangeMtu(x *BleXport, bl *BleListener, r 
*BleExchangeMtuReq) (
                        default:
                        }
 
-               case <-bl.AfterTimeout(x.rspTimeout):
+               case <-bl.AfterTimeout(x.RspTimeout()):
                        return 0, BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU)
                }
        }
@@ -320,7 +320,7 @@ func scan(x *BleXport, bl *BleListener, r *BleScanReq,
                        default:
                        }
 
-               case <-bl.AfterTimeout(x.rspTimeout):
+               case <-bl.AfterTimeout(x.RspTimeout()):
                        return BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU)
 
                case <-abortChan:
@@ -356,7 +356,7 @@ func scanCancel(x *BleXport, bl *BleListener, r 
*BleScanCancelReq) error {
                        default:
                        }
 
-               case <-bl.AfterTimeout(x.rspTimeout):
+               case <-bl.AfterTimeout(x.RspTimeout()):
                        return BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU)
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_fsm.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index cf12bdc..39bfdfc 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -216,6 +216,30 @@ func (bf *BleFsm) calcDisconnectType() 
BleFsmDisconnectType {
        }
 }
 
+func (bf *BleFsm) onDisconnect(err error) {
+       log.Debugf(err.Error())
+
+       bf.mtx.Lock()
+       bls := make([]*BleListener, 0, len(bf.bls))
+       for bl, _ := range bf.bls {
+               bls = append(bls, bl)
+       }
+       bf.mtx.Unlock()
+
+       // Remember some fields before we clear them.
+       dt := bf.calcDisconnectType()
+       peer := *bf.peerDev
+
+       bf.setState(SESN_STATE_UNCONNECTED)
+       bf.peerDev = nil
+
+       for _, bl := range bls {
+               bl.ErrChan <- err
+       }
+
+       bf.params.DisconnectCb(dt, peer, err)
+}
+
 func (bf *BleFsm) connectListen(seq int) error {
        bf.connChan = make(chan error, 1)
 
@@ -228,7 +252,10 @@ func (bf *BleFsm) connectListen(seq int) error {
                defer bf.removeBleSeqListener(seq)
                for {
                        select {
-                       case <-bl.ErrChan:
+                       case err := <-bl.ErrChan:
+                               // Transport reported error.  Assume all 
connections have
+                               // dropped.
+                               bf.onDisconnect(err)
                                return
 
                        case bm := <-bl.BleChan:
@@ -281,33 +308,13 @@ func (bf *BleFsm) connectListen(seq int) error {
 
                                case *BleDisconnectEvt:
                                        err := bf.disconnectError(msg.Reason)
-                                       log.Debugf(err.Error())
-
-                                       bf.mtx.Lock()
-                                       bls := make([]*BleListener, 0, 
len(bf.bls))
-                                       for bl, _ := range bf.bls {
-                                               bls = append(bls, bl)
-                                       }
-                                       bf.mtx.Unlock()
-
-                                       // Remember some fields before we clear 
them.
-                                       dt := bf.calcDisconnectType()
-                                       peer := *bf.peerDev
-
-                                       bf.setState(SESN_STATE_UNCONNECTED)
-                                       bf.peerDev = nil
-
-                                       for _, bl := range bls {
-                                               bl.ErrChan <- err
-                                       }
-
-                                       bf.params.DisconnectCb(dt, peer, err)
+                                       bf.onDisconnect(err)
                                        return
 
                                default:
                                }
 
-                       case <-bl.AfterTimeout(bf.params.Bx.rspTimeout):
+                       case <-bl.AfterTimeout(bf.params.Bx.RspTimeout()):
                                bf.connChan <- BhdTimeoutError(MSG_TYPE_CONNECT)
                        }
                }

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_oic_sesn.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index 4001960..5fa9291 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -5,6 +5,8 @@ import (
        "sync"
        "time"
 
+       log "github.com/Sirupsen/logrus"
+
        "mynewt.apache.org/newt/util"
        . "mynewt.apache.org/newtmgr/nmxact/bledefs"
        "mynewt.apache.org/newtmgr/nmxact/nmp"
@@ -135,11 +137,21 @@ func (bos *BleOicSesn) AbortRx(seq uint8) error {
 func (bos *BleOicSesn) Open() error {
        var err error
        for i := 0; i < bos.connTries; i++ {
+               log.Debugf("Opening BLE session; try %d/%d", i+1, bos.connTries)
+
                var retry bool
                retry, err = bos.bf.Start()
                if !retry {
                        break
                }
+
+               if bos.blockUntilClosed(1*time.Second) != nil {
+                       // Just close the session manually and report the 
original error.
+                       bos.Close()
+                       return err
+               }
+
+               log.Debugf("Connection to BLE peer dropped immediately; 
retrying")
        }
 
        return err

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_plain_sesn.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index 012a301..15419a9 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -5,6 +5,8 @@ import (
        "sync"
        "time"
 
+       log "github.com/Sirupsen/logrus"
+
        "mynewt.apache.org/newt/util"
        . "mynewt.apache.org/newtmgr/nmxact/bledefs"
        "mynewt.apache.org/newtmgr/nmxact/nmp"
@@ -128,6 +130,8 @@ func (bps *BlePlainSesn) AbortRx(seq uint8) error {
 func (bps *BlePlainSesn) Open() error {
        var err error
        for i := 0; i < bps.connTries; i++ {
+               log.Debugf("Opening BLE session; try %d/%d", i+1, bps.connTries)
+
                var retry bool
                retry, err = bps.bf.Start()
                if !retry {
@@ -139,6 +143,8 @@ func (bps *BlePlainSesn) Open() error {
                        bps.Close()
                        return err
                }
+
+               log.Debugf("Connection to BLE peer dropped immediately; 
retrying")
        }
 
        return err

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_xport.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index ba38b9d..16a4128 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -9,9 +9,9 @@ import (
 
        log "github.com/Sirupsen/logrus"
 
+       "mynewt.apache.org/newt/util/unixchild"
        "mynewt.apache.org/newtmgr/nmxact/nmxutil"
        "mynewt.apache.org/newtmgr/nmxact/sesn"
-       "mynewt.apache.org/newt/util/unixchild"
 )
 
 type XportCfg struct {
@@ -33,13 +33,17 @@ type XportCfg struct {
 
        // Path of the BLE controller device (e.g., /dev/ttyUSB0).
        DevPath string
+
+       // How long to allow for the host and controller to sync at startup.
+       SyncTimeout time.Duration
 }
 
 func NewXportCfg() XportCfg {
        return XportCfg{
                BlehostdAcceptTimeout: time.Second,
-               BlehostdRestart:       true,
                BlehostdRspTimeout:    time.Second,
+               BlehostdRestart:       true,
+               SyncTimeout:           10 * time.Second,
        }
 }
 
@@ -57,31 +61,29 @@ type BleXport struct {
        client *unixchild.Client
        state  BleXportState
 
-       syncTimeout time.Duration
-       rspTimeout  time.Duration
+       cfg XportCfg
 }
 
 func NewBleXport(cfg XportCfg) (*BleXport, error) {
-       config := unixchild.Config{
-               SockPath:      cfg.SockPath,
-               ChildPath:     cfg.BlehostdPath,
-               ChildArgs:     []string{cfg.DevPath, cfg.SockPath},
-               Depth:         10,
-               MaxMsgSz:      10240,
-               AcceptTimeout: cfg.BlehostdAcceptTimeout,
-               Restart:       cfg.BlehostdRestart,
+       bx := &BleXport{
+               Bd:  NewBleDispatcher(),
+               cfg: cfg,
        }
 
-       c := unixchild.New(config)
+       return bx, nil
+}
 
-       bx := &BleXport{
-               client:      c,
-               Bd:          NewBleDispatcher(),
-               syncTimeout: 10 * time.Second,
-               rspTimeout:  cfg.BlehostdRspTimeout,
+func (bx *BleXport) createUnixChild() {
+       config := unixchild.Config{
+               SockPath:      bx.cfg.SockPath,
+               ChildPath:     bx.cfg.BlehostdPath,
+               ChildArgs:     []string{bx.cfg.DevPath, bx.cfg.SockPath},
+               Depth:         10,
+               MaxMsgSz:      10240,
+               AcceptTimeout: bx.cfg.BlehostdAcceptTimeout,
        }
 
-       return bx, nil
+       bx.client = unixchild.New(config)
 }
 
 func (bx *BleXport) BuildSesn(cfg sesn.SesnCfg) (sesn.Sesn, error) {
@@ -182,11 +184,11 @@ func (bx *BleXport) onError(err error) {
                // Stop already in progress.
                return
        }
-       bx.Bd.ErrorAll(err)
        if bx.client != nil {
                bx.client.Stop()
                bx.client.FromChild <- nil
        }
+       bx.Bd.ErrorAll(err)
 }
 
 func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
@@ -209,24 +211,29 @@ func (bx *BleXport) Start() error {
                return nmxutil.NewXportError("BLE xport started twice")
        }
 
+       bx.createUnixChild()
        if err := bx.client.Start(); err != nil {
-               return nmxutil.NewXportError(
-                       "Failed to start child child process: " + err.Error())
+               if unixchild.IsUcAcceptError(err) {
+                       err = nmxutil.NewXportError("blehostd did not connect 
to socket; " +
+                               "controller not attached?")
+               } else {
+                       err = nmxutil.NewXportError(
+                               "Failed to start child process: " + err.Error())
+               }
+               bx.setStateFrom(BLE_XPORT_STATE_STARTING, 
BLE_XPORT_STATE_STOPPED)
+               return err
        }
 
        go func() {
                err := <-bx.client.ErrChild
-               if unixchild.IsUcAcceptError(err) {
-                       err = fmt.Errorf("blehostd did not connect to socket; " 
+
-                               "controller not attached?")
-               }
+               err = nmxutil.NewXportError("BLE transport error: " + 
err.Error())
+               fmt.Printf("%s\n", err.Error())
                bx.onError(err)
-               return
        }()
 
        go func() {
                for {
-                       if _, err := bx.rx(); err != nil {
+                       if b := bx.rx(); b == nil {
                                // The error should have been reported to 
everyone interested.
                                break
                        }
@@ -254,7 +261,7 @@ func (bx *BleXport) Start() error {
                                                break SyncLoop
                                        }
                                }
-                       case <-time.After(bx.syncTimeout):
+                       case <-time.After(bx.cfg.SyncTimeout):
                                bx.Stop()
                                return nmxutil.NewXportError(
                                        "Timeout waiting for host <-> 
controller sync")
@@ -305,15 +312,15 @@ func (bx *BleXport) Tx(data []byte) error {
        return nil
 }
 
-func (bx *BleXport) rx() ([]byte, error) {
-       select {
-       case err := <-bx.client.ErrChild:
-               return nil, err
-       case buf := <-bx.client.FromChild:
-               if len(buf) != 0 {
-                       log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
-                       bx.Bd.Dispatch(buf)
-               }
-               return buf, nil
+func (bx *BleXport) rx() []byte {
+       buf := <-bx.client.FromChild
+       if len(buf) != 0 {
+               log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
+               bx.Bd.Dispatch(buf)
        }
+       return buf
+}
+
+func (bx *BleXport) RspTimeout() time.Duration {
+       return bx.cfg.BlehostdRspTimeout
 }

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmp/dispatch.go
----------------------------------------------------------------------
diff --git a/nmxact/nmp/dispatch.go b/nmxact/nmp/dispatch.go
index 7b1ad11..5b39a9b 100644
--- a/nmxact/nmp/dispatch.go
+++ b/nmxact/nmp/dispatch.go
@@ -39,6 +39,7 @@ func NewNmpListener() *NmpListener {
        return &NmpListener{
                RspChan: make(chan NmpRsp, 1),
                ErrChan: make(chan error, 1),
+               tmoChan: make(chan time.Time, 1),
        }
 }
 

Reply via email to