nxmact - Protect access to master op state I.e., prevent two sessions from connecting at the same time.
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/14d4b457 Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/14d4b457 Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/14d4b457 Branch: refs/heads/master Commit: 14d4b457c0384fe7cdd6050b8fe551d55c465b85 Parents: aa00340 Author: Christopher Collins <[email protected]> Authored: Fri Apr 7 16:02:10 2017 -0700 Committer: Christopher Collins <[email protected]> Committed: Fri Apr 7 16:02:10 2017 -0700 ---------------------------------------------------------------------- nmxact/nmble/ble_fsm.go | 62 ++++++++++++++++++++++++++++++++---------- nmxact/nmble/ble_xport.go | 47 ++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/14d4b457/nmxact/nmble/ble_fsm.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go index 7bf429e..8dacc4e 100644 --- a/nmxact/nmble/ble_fsm.go +++ b/nmxact/nmble/ble_fsm.go @@ -105,16 +105,30 @@ func (bf *BleFsm) getState() BleSesnState { return bf.state } -func (bf *BleFsm) setStateNoLock(toState BleSesnState) { +func stateRequiresMaster(s BleSesnState) bool { + return s == SESN_STATE_SCANNING || s == SESN_STATE_CONNECTING +} + +func (bf *BleFsm) setStateNoLock(toState BleSesnState) error { + if !stateRequiresMaster(bf.state) && stateRequiresMaster(toState) { + if err := bf.params.Bx.AcquireMaster(); err != nil { + return err + } + } else if stateRequiresMaster(bf.state) && !stateRequiresMaster(toState) { + bf.params.Bx.ReleaseMaster() + } + bf.state = toState bf.lastStateChange = time.Now() + + return nil } -func (bf *BleFsm) setState(toState BleSesnState) { +func (bf *BleFsm) setState(toState BleSesnState) error { bf.mtx.Lock() defer bf.mtx.Unlock() - bf.setStateNoLock(toState) + return bf.setStateNoLock(toState) } func (bf *BleFsm) transitionState(fromState BleSesnState, @@ -130,10 +144,20 @@ func (bf *BleFsm) transitionState(fromState BleSesnState, toState, fromState) } - bf.setStateNoLock(toState) + if err := bf.setStateNoLock(toState); err != nil { + return err + } + return nil } +func (bf *BleFsm) resetState() { + if err := bf.setState(SESN_STATE_UNCONNECTED); err != nil { + log.Debugf("BleFsm state change resulted in unexpected error: %s", + err) + } +} + func (bf *BleFsm) addBleListener(base BleMsgBase) (*BleListener, error) { bl := NewBleListener() @@ -195,11 +219,15 @@ func (bf *BleFsm) action( } if err := cb(); err != nil { - bf.setState(preState) + if err := bf.setState(preState); err != nil { + return err + } return err } - bf.setState(postState) + if err := bf.setState(postState); err != nil { + return err + } return nil } @@ -229,15 +257,16 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType { } func (bf *BleFsm) onDisconnect(err error) { - log.Debugf(err.Error()) - bf.mtx.Lock() // Remember some fields before we clear them. dt := calcDisconnectType(bf.state) peer := *bf.peerDev - bf.setStateNoLock(SESN_STATE_UNCONNECTED) + if err := bf.setStateNoLock(SESN_STATE_UNCONNECTED); err != nil { + log.Debugf("BleFsm state change resulted in unexpected error: %s", + err) + } bf.peerDev = nil // Make a copy of all the listeners so we don't have to keep the mutex @@ -453,7 +482,10 @@ func (bf *BleFsm) terminateSetState() error { return fmt.Errorf( "BLE terminate failed; session already being closed") default: - bf.setStateNoLock(SESN_STATE_TERMINATING) + if err := bf.setStateNoLock(SESN_STATE_TERMINATING); err != nil { + log.Debugf("BleFsm state change resulted in unexpected error: %s", + err) + } } return nil @@ -678,7 +710,7 @@ func (bf *BleFsm) Start() (bool, error) { } if err != nil { - bf.setState(SESN_STATE_UNCONNECTED) + bf.resetState() return false, err } @@ -692,7 +724,7 @@ func (bf *BleFsm) Start() (bool, error) { if err != nil { bhe := nmxutil.ToBleHost(err) retry := bhe != nil && bhe.Status == ERR_CODE_ENOTCONN - bf.setState(SESN_STATE_UNCONNECTED) + bf.resetState() return retry, err } @@ -704,7 +736,7 @@ func (bf *BleFsm) Start() (bool, error) { SESN_STATE_DISCOVERED_SVC, cb) if err != nil { - bf.setState(SESN_STATE_UNCONNECTED) + bf.resetState() return false, err } @@ -719,12 +751,12 @@ func (bf *BleFsm) Start() (bool, error) { SESN_STATE_DISCOVERED_CHR, cb) if err != nil { - bf.setState(SESN_STATE_UNCONNECTED) + bf.resetState() return false, err } if err := bf.subscribe(); err != nil { - bf.setState(SESN_STATE_UNCONNECTED) + bf.resetState() return false, err } http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/14d4b457/nmxact/nmble/ble_xport.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go index 93fece1..f5a9272 100644 --- a/nmxact/nmble/ble_xport.go +++ b/nmxact/nmble/ble_xport.go @@ -91,6 +91,8 @@ type BleXport struct { shutdownChan chan bool readyChan chan error numReadyListeners int + masterQueue [](chan error) + masterActive bool randAddr *BleAddr mtx sync.Mutex @@ -102,6 +104,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) { Bd: NewBleDispatcher(), shutdownChan: make(chan bool), readyChan: make(chan error), + masterQueue: [](chan error){}, cfg: cfg, } @@ -247,6 +250,11 @@ func (bx *BleXport) shutdown(restart bool, err error) { // them from blocking endlessly while awaiting a BLE message. bx.Bd.ErrorAll(err) + for _, listener := range bx.masterQueue { + listener <- err + } + bx.masterQueue = [](chan error){} + // Stop all of this transport's go routines. for i := 0; i < bx.numStopListeners; i++ { bx.stopChan <- struct{}{} @@ -514,3 +522,42 @@ func (bx *BleXport) Tx(data []byte) error { func (bx *BleXport) RspTimeout() time.Duration { return bx.cfg.BlehostdRspTimeout } + +func (bx *BleXport) AcquireMaster() error { + bx.mtx.Lock() + + if !bx.masterActive { + bx.masterActive = true + bx.mtx.Unlock() + return nil + } + + listener := make(chan error) + bx.masterQueue = append(bx.masterQueue, listener) + + bx.mtx.Unlock() + + return <-listener +} + +func (bx *BleXport) ReleaseMaster() { + bx.mtx.Lock() + + if !bx.masterActive { + bx.mtx.Unlock() + return + } + + if len(bx.masterQueue) == 0 { + bx.masterActive = false + bx.mtx.Unlock() + return + } + + listener := bx.masterQueue[0] + bx.masterQueue = bx.masterQueue[1:] + + bx.mtx.Unlock() + + listener <- nil +}
