nmxact - Stability when xport fails
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/db639f93 Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/db639f93 Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/db639f93 Branch: refs/heads/master Commit: db639f9313a27c051c0b6dd9e13840549f3f5acd Parents: 0384fcf Author: Christopher Collins <[email protected]> Authored: Fri Mar 31 18:26:35 2017 -0700 Committer: Christopher Collins <[email protected]> Committed: Mon Apr 3 16:49:25 2017 -0700 ---------------------------------------------------------------------- nmxact/nmble/ble_act.go | 16 +++---- nmxact/nmble/ble_fsm.go | 53 +++++++++++++---------- nmxact/nmble/ble_oic_sesn.go | 12 ++++++ nmxact/nmble/ble_plain_sesn.go | 6 +++ nmxact/nmble/ble_xport.go | 85 ++++++++++++++++++++----------------- nmxact/nmp/dispatch.go | 1 + 6 files changed, 103 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_act.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go index 2aa6678..cb6cb00 100644 --- a/nmxact/nmble/ble_act.go +++ b/nmxact/nmble/ble_act.go @@ -56,7 +56,7 @@ func terminate(x *BleXport, bl *BleListener, r *BleTerminateReq) error { default: } - case <-bl.AfterTimeout(x.rspTimeout): + case <-bl.AfterTimeout(x.RspTimeout()): return BhdTimeoutError(MSG_TYPE_TERMINATE) } } @@ -92,7 +92,7 @@ func connCancel(x *BleXport, bl *BleListener, r *BleConnCancelReq) error { default: } - case <-bl.AfterTimeout(x.rspTimeout): + case <-bl.AfterTimeout(x.RspTimeout()): return BhdTimeoutError(MSG_TYPE_TERMINATE) } } @@ -148,7 +148,7 @@ func discSvcUuid(x *BleXport, bl *BleListener, r *BleDiscSvcUuidReq) ( default: } - case <-bl.AfterTimeout(x.rspTimeout): + case <-bl.AfterTimeout(x.RspTimeout()): return nil, BhdTimeoutError(MSG_TYPE_DISC_SVC_UUID) } } @@ -198,7 +198,7 @@ func discAllChrs(x *BleXport, bl *BleListener, r *BleDiscAllChrsReq) ( default: } - case <-bl.AfterTimeout(x.rspTimeout): + case <-bl.AfterTimeout(x.RspTimeout()): return nil, BhdTimeoutError(MSG_TYPE_DISC_ALL_CHRS) } } @@ -235,7 +235,7 @@ func writeCmd(x *BleXport, bl *BleListener, r *BleWriteCmdReq) error { default: } - case <-bl.AfterTimeout(x.rspTimeout): + case <-bl.AfterTimeout(x.RspTimeout()): return BhdTimeoutError(MSG_TYPE_WRITE_CMD) } } @@ -281,7 +281,7 @@ func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) ( default: } - case <-bl.AfterTimeout(x.rspTimeout): + case <-bl.AfterTimeout(x.RspTimeout()): return 0, BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU) } } @@ -320,7 +320,7 @@ func scan(x *BleXport, bl *BleListener, r *BleScanReq, default: } - case <-bl.AfterTimeout(x.rspTimeout): + case <-bl.AfterTimeout(x.RspTimeout()): return BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU) case <-abortChan: @@ -356,7 +356,7 @@ func scanCancel(x *BleXport, bl *BleListener, r *BleScanCancelReq) error { default: } - case <-bl.AfterTimeout(x.rspTimeout): + case <-bl.AfterTimeout(x.RspTimeout()): return BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU) } } http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_fsm.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go index cf12bdc..39bfdfc 100644 --- a/nmxact/nmble/ble_fsm.go +++ b/nmxact/nmble/ble_fsm.go @@ -216,6 +216,30 @@ func (bf *BleFsm) calcDisconnectType() BleFsmDisconnectType { } } +func (bf *BleFsm) onDisconnect(err error) { + log.Debugf(err.Error()) + + bf.mtx.Lock() + bls := make([]*BleListener, 0, len(bf.bls)) + for bl, _ := range bf.bls { + bls = append(bls, bl) + } + bf.mtx.Unlock() + + // Remember some fields before we clear them. + dt := bf.calcDisconnectType() + peer := *bf.peerDev + + bf.setState(SESN_STATE_UNCONNECTED) + bf.peerDev = nil + + for _, bl := range bls { + bl.ErrChan <- err + } + + bf.params.DisconnectCb(dt, peer, err) +} + func (bf *BleFsm) connectListen(seq int) error { bf.connChan = make(chan error, 1) @@ -228,7 +252,10 @@ func (bf *BleFsm) connectListen(seq int) error { defer bf.removeBleSeqListener(seq) for { select { - case <-bl.ErrChan: + case err := <-bl.ErrChan: + // Transport reported error. Assume all connections have + // dropped. + bf.onDisconnect(err) return case bm := <-bl.BleChan: @@ -281,33 +308,13 @@ func (bf *BleFsm) connectListen(seq int) error { case *BleDisconnectEvt: err := bf.disconnectError(msg.Reason) - log.Debugf(err.Error()) - - bf.mtx.Lock() - bls := make([]*BleListener, 0, len(bf.bls)) - for bl, _ := range bf.bls { - bls = append(bls, bl) - } - bf.mtx.Unlock() - - // Remember some fields before we clear them. - dt := bf.calcDisconnectType() - peer := *bf.peerDev - - bf.setState(SESN_STATE_UNCONNECTED) - bf.peerDev = nil - - for _, bl := range bls { - bl.ErrChan <- err - } - - bf.params.DisconnectCb(dt, peer, err) + bf.onDisconnect(err) return default: } - case <-bl.AfterTimeout(bf.params.Bx.rspTimeout): + case <-bl.AfterTimeout(bf.params.Bx.RspTimeout()): bf.connChan <- BhdTimeoutError(MSG_TYPE_CONNECT) } } http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_oic_sesn.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go index 4001960..5fa9291 100644 --- a/nmxact/nmble/ble_oic_sesn.go +++ b/nmxact/nmble/ble_oic_sesn.go @@ -5,6 +5,8 @@ import ( "sync" "time" + log "github.com/Sirupsen/logrus" + "mynewt.apache.org/newt/util" . "mynewt.apache.org/newtmgr/nmxact/bledefs" "mynewt.apache.org/newtmgr/nmxact/nmp" @@ -135,11 +137,21 @@ func (bos *BleOicSesn) AbortRx(seq uint8) error { func (bos *BleOicSesn) Open() error { var err error for i := 0; i < bos.connTries; i++ { + log.Debugf("Opening BLE session; try %d/%d", i+1, bos.connTries) + var retry bool retry, err = bos.bf.Start() if !retry { break } + + if bos.blockUntilClosed(1*time.Second) != nil { + // Just close the session manually and report the original error. + bos.Close() + return err + } + + log.Debugf("Connection to BLE peer dropped immediately; retrying") } return err http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_plain_sesn.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go index 012a301..15419a9 100644 --- a/nmxact/nmble/ble_plain_sesn.go +++ b/nmxact/nmble/ble_plain_sesn.go @@ -5,6 +5,8 @@ import ( "sync" "time" + log "github.com/Sirupsen/logrus" + "mynewt.apache.org/newt/util" . "mynewt.apache.org/newtmgr/nmxact/bledefs" "mynewt.apache.org/newtmgr/nmxact/nmp" @@ -128,6 +130,8 @@ func (bps *BlePlainSesn) AbortRx(seq uint8) error { func (bps *BlePlainSesn) Open() error { var err error for i := 0; i < bps.connTries; i++ { + log.Debugf("Opening BLE session; try %d/%d", i+1, bps.connTries) + var retry bool retry, err = bps.bf.Start() if !retry { @@ -139,6 +143,8 @@ func (bps *BlePlainSesn) Open() error { bps.Close() return err } + + log.Debugf("Connection to BLE peer dropped immediately; retrying") } return err http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmble/ble_xport.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go index ba38b9d..16a4128 100644 --- a/nmxact/nmble/ble_xport.go +++ b/nmxact/nmble/ble_xport.go @@ -9,9 +9,9 @@ import ( log "github.com/Sirupsen/logrus" + "mynewt.apache.org/newt/util/unixchild" "mynewt.apache.org/newtmgr/nmxact/nmxutil" "mynewt.apache.org/newtmgr/nmxact/sesn" - "mynewt.apache.org/newt/util/unixchild" ) type XportCfg struct { @@ -33,13 +33,17 @@ type XportCfg struct { // Path of the BLE controller device (e.g., /dev/ttyUSB0). DevPath string + + // How long to allow for the host and controller to sync at startup. + SyncTimeout time.Duration } func NewXportCfg() XportCfg { return XportCfg{ BlehostdAcceptTimeout: time.Second, - BlehostdRestart: true, BlehostdRspTimeout: time.Second, + BlehostdRestart: true, + SyncTimeout: 10 * time.Second, } } @@ -57,31 +61,29 @@ type BleXport struct { client *unixchild.Client state BleXportState - syncTimeout time.Duration - rspTimeout time.Duration + cfg XportCfg } func NewBleXport(cfg XportCfg) (*BleXport, error) { - config := unixchild.Config{ - SockPath: cfg.SockPath, - ChildPath: cfg.BlehostdPath, - ChildArgs: []string{cfg.DevPath, cfg.SockPath}, - Depth: 10, - MaxMsgSz: 10240, - AcceptTimeout: cfg.BlehostdAcceptTimeout, - Restart: cfg.BlehostdRestart, + bx := &BleXport{ + Bd: NewBleDispatcher(), + cfg: cfg, } - c := unixchild.New(config) + return bx, nil +} - bx := &BleXport{ - client: c, - Bd: NewBleDispatcher(), - syncTimeout: 10 * time.Second, - rspTimeout: cfg.BlehostdRspTimeout, +func (bx *BleXport) createUnixChild() { + config := unixchild.Config{ + SockPath: bx.cfg.SockPath, + ChildPath: bx.cfg.BlehostdPath, + ChildArgs: []string{bx.cfg.DevPath, bx.cfg.SockPath}, + Depth: 10, + MaxMsgSz: 10240, + AcceptTimeout: bx.cfg.BlehostdAcceptTimeout, } - return bx, nil + bx.client = unixchild.New(config) } func (bx *BleXport) BuildSesn(cfg sesn.SesnCfg) (sesn.Sesn, error) { @@ -182,11 +184,11 @@ func (bx *BleXport) onError(err error) { // Stop already in progress. return } - bx.Bd.ErrorAll(err) if bx.client != nil { bx.client.Stop() bx.client.FromChild <- nil } + bx.Bd.ErrorAll(err) } func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool { @@ -209,24 +211,29 @@ func (bx *BleXport) Start() error { return nmxutil.NewXportError("BLE xport started twice") } + bx.createUnixChild() if err := bx.client.Start(); err != nil { - return nmxutil.NewXportError( - "Failed to start child child process: " + err.Error()) + if unixchild.IsUcAcceptError(err) { + err = nmxutil.NewXportError("blehostd did not connect to socket; " + + "controller not attached?") + } else { + err = nmxutil.NewXportError( + "Failed to start child process: " + err.Error()) + } + bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STOPPED) + return err } go func() { err := <-bx.client.ErrChild - if unixchild.IsUcAcceptError(err) { - err = fmt.Errorf("blehostd did not connect to socket; " + - "controller not attached?") - } + err = nmxutil.NewXportError("BLE transport error: " + err.Error()) + fmt.Printf("%s\n", err.Error()) bx.onError(err) - return }() go func() { for { - if _, err := bx.rx(); err != nil { + if b := bx.rx(); b == nil { // The error should have been reported to everyone interested. break } @@ -254,7 +261,7 @@ func (bx *BleXport) Start() error { break SyncLoop } } - case <-time.After(bx.syncTimeout): + case <-time.After(bx.cfg.SyncTimeout): bx.Stop() return nmxutil.NewXportError( "Timeout waiting for host <-> controller sync") @@ -305,15 +312,15 @@ func (bx *BleXport) Tx(data []byte) error { return nil } -func (bx *BleXport) rx() ([]byte, error) { - select { - case err := <-bx.client.ErrChild: - return nil, err - case buf := <-bx.client.FromChild: - if len(buf) != 0 { - log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf)) - bx.Bd.Dispatch(buf) - } - return buf, nil +func (bx *BleXport) rx() []byte { + buf := <-bx.client.FromChild + if len(buf) != 0 { + log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf)) + bx.Bd.Dispatch(buf) } + return buf +} + +func (bx *BleXport) RspTimeout() time.Duration { + return bx.cfg.BlehostdRspTimeout } http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/db639f93/nmxact/nmp/dispatch.go ---------------------------------------------------------------------- diff --git a/nmxact/nmp/dispatch.go b/nmxact/nmp/dispatch.go index 7b1ad11..5b39a9b 100644 --- a/nmxact/nmp/dispatch.go +++ b/nmxact/nmp/dispatch.go @@ -39,6 +39,7 @@ func NewNmpListener() *NmpListener { return &NmpListener{ RspChan: make(chan NmpRsp, 1), ErrChan: make(chan error, 1), + tmoChan: make(chan time.Time, 1), } }
