Repository: incubator-mynewt-newtmgr Updated Branches: refs/heads/master 6c9269d72 -> e5dcf07e4
nmxact - misc cleanup. Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/e5dcf07e Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/e5dcf07e Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/e5dcf07e Branch: refs/heads/master Commit: e5dcf07e47fc4198cadf9c77304dc41772095f68 Parents: 6c9269d Author: Christopher Collins <[email protected]> Authored: Mon Apr 10 22:25:22 2017 -0700 Committer: Christopher Collins <[email protected]> Committed: Mon Apr 10 22:30:19 2017 -0700 ---------------------------------------------------------------------- newtmgr/newtmgr.go | 7 +- nmxact/bledefs/bledefs.go | 6 +- nmxact/example/ble_dual/ble_dual.go | 8 +- nmxact/nmble/ble_act.go | 103 +++++++------- nmxact/nmble/ble_fsm.go | 225 ++++++++++++------------------- nmxact/nmble/ble_proto.go | 4 +- nmxact/nmble/ble_xport.go | 1 + nmxact/nmxutil/nmxutil.go | 77 +++++++++++ nmxact/sesn/sesn_cfg.go | 2 +- 9 files changed, 229 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/newtmgr/newtmgr.go ---------------------------------------------------------------------- diff --git a/newtmgr/newtmgr.go b/newtmgr/newtmgr.go index 0307020..f1f10b5 100644 --- a/newtmgr/newtmgr.go +++ b/newtmgr/newtmgr.go @@ -25,10 +25,10 @@ import ( "os/signal" "syscall" + "mynewt.apache.org/newt/util" "mynewt.apache.org/newtmgr/newtmgr/cli" "mynewt.apache.org/newtmgr/newtmgr/config" "mynewt.apache.org/newtmgr/nmxact/nmserial" - "mynewt.apache.org/newt/util" ) func main() { @@ -38,11 +38,6 @@ func main() { } onExit := func() { - s, err := cli.GetSesnIfOpen() - if err == nil { - s.Close() - } - x, err := cli.GetXportIfOpen() if err == nil { // Don't attempt to close a serial transport. Attempting to close http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/bledefs/bledefs.go ---------------------------------------------------------------------- diff --git a/nmxact/bledefs/bledefs.go b/nmxact/bledefs/bledefs.go index a64da61..b811dc0 100644 --- a/nmxact/bledefs/bledefs.go +++ b/nmxact/bledefs/bledefs.go @@ -48,7 +48,7 @@ var BleAddrTypeStringMap = map[BleAddrType]string{ func BleAddrTypeToString(addrType BleAddrType) string { s := BleAddrTypeStringMap[addrType] if s == "" { - panic(fmt.Sprintf("Invalid BleAddrType: %d", int(addrType))) + return "???" } return s @@ -241,7 +241,7 @@ var BleScanFilterPolicyStringMap = map[BleScanFilterPolicy]string{ func BleScanFilterPolicyToString(filtPolicy BleScanFilterPolicy) string { s := BleScanFilterPolicyStringMap[filtPolicy] if s == "" { - panic(fmt.Sprintf("Invalid BleScanFilterPolicy: %d", int(filtPolicy))) + return "???" } return s @@ -295,7 +295,7 @@ var BleAdvEventTypeStringMap = map[BleAdvEventType]string{ func BleAdvEventTypeToString(advEventType BleAdvEventType) string { s := BleAdvEventTypeStringMap[advEventType] if s == "" { - panic(fmt.Sprintf("Invalid BleAdvEventType: %d", int(advEventType))) + return "???" } return s http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/example/ble_dual/ble_dual.go ---------------------------------------------------------------------- diff --git a/nmxact/example/ble_dual/ble_dual.go b/nmxact/example/ble_dual/ble_dual.go index bedff95..7a43b45 100644 --- a/nmxact/example/ble_dual/ble_dual.go +++ b/nmxact/example/ble_dual/ble_dual.go @@ -36,10 +36,6 @@ import ( func configExitHandler(x xport.Xport, s sesn.Sesn) { onExit := func() { - if s.IsOpen() { - s.Close() - } - x.Stop() } @@ -70,8 +66,8 @@ func sendOne(s sesn.Sesn) { if !s.IsOpen() { // Connect to the peer (open the session). if err := s.Open(); err != nil { - fmt.Fprintf(os.Stderr, "error starting BLE session: %s\n", - err.Error()) + fmt.Fprintf(os.Stderr, "error starting BLE session: %s (%+v)\n", + err.Error(), err) return } } http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/nmble/ble_act.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go index e5e336c..e924d2a 100644 --- a/nmxact/nmble/ble_act.go +++ b/nmxact/nmble/ble_act.go @@ -28,6 +28,8 @@ func connect(x *BleXport, connChan chan error, r *BleConnectReq) error { // Blocking func terminate(x *BleXport, bl *BleListener, r *BleTerminateReq) error { + const rspType = MSG_TYPE_TERMINATE + j, err := json.Marshal(r) if err != nil { return err @@ -47,9 +49,7 @@ func terminate(x *BleXport, bl *BleListener, r *BleTerminateReq) error { case *BleTerminateRsp: bl.Acked = true if msg.Status != 0 { - return StatusError(MSG_OP_RSP, - MSG_TYPE_TERMINATE, - msg.Status) + return StatusError(MSG_OP_RSP, rspType, msg.Status) } else { return nil } @@ -58,12 +58,14 @@ func terminate(x *BleXport, bl *BleListener, r *BleTerminateReq) error { } case <-bl.AfterTimeout(x.RspTimeout()): - return BhdTimeoutError(MSG_TYPE_TERMINATE) + return BhdTimeoutError(rspType) } } } func connCancel(x *BleXport, bl *BleListener, r *BleConnCancelReq) error { + const rspType = MSG_TYPE_CONN_CANCEL + j, err := json.Marshal(r) if err != nil { return err @@ -83,9 +85,7 @@ func connCancel(x *BleXport, bl *BleListener, r *BleConnCancelReq) error { case *BleConnCancelRsp: bl.Acked = true if msg.Status != 0 { - return StatusError(MSG_OP_RSP, - MSG_TYPE_CONN_CANCEL, - msg.Status) + return StatusError(MSG_OP_RSP, rspType, msg.Status) } else { return nil } @@ -94,7 +94,7 @@ func connCancel(x *BleXport, bl *BleListener, r *BleConnCancelReq) error { } case <-bl.AfterTimeout(x.RspTimeout()): - return BhdTimeoutError(MSG_TYPE_TERMINATE) + return BhdTimeoutError(rspType) } } } @@ -103,6 +103,9 @@ func connCancel(x *BleXport, bl *BleListener, r *BleConnCancelReq) error { func discSvcUuid(x *BleXport, bl *BleListener, r *BleDiscSvcUuidReq) ( *BleSvc, error) { + const rspType = MSG_TYPE_DISC_SVC_UUID + const evtType = MSG_TYPE_DISC_SVC_EVT + j, err := json.Marshal(r) if err != nil { return nil, err @@ -123,9 +126,7 @@ func discSvcUuid(x *BleXport, bl *BleListener, r *BleDiscSvcUuidReq) ( case *BleDiscSvcUuidRsp: bl.Acked = true if msg.Status != 0 { - return nil, StatusError(MSG_OP_RSP, - MSG_TYPE_DISC_SVC_UUID, - msg.Status) + return nil, StatusError(MSG_OP_RSP, rspType, msg.Status) } case *BleDiscSvcEvt: @@ -141,16 +142,14 @@ func discSvcUuid(x *BleXport, bl *BleListener, r *BleDiscSvcUuidReq) ( } return svc, nil default: - return nil, StatusError(MSG_OP_EVT, - MSG_TYPE_DISC_SVC_EVT, - msg.Status) + return nil, StatusError(MSG_OP_EVT, evtType, msg.Status) } default: } case <-bl.AfterTimeout(x.RspTimeout()): - return nil, BhdTimeoutError(MSG_TYPE_DISC_SVC_UUID) + return nil, BhdTimeoutError(rspType) } } } @@ -159,6 +158,9 @@ func discSvcUuid(x *BleXport, bl *BleListener, r *BleDiscSvcUuidReq) ( func discAllChrs(x *BleXport, bl *BleListener, r *BleDiscAllChrsReq) ( []*BleChr, error) { + const rspType = MSG_TYPE_DISC_ALL_CHRS + const evtType = MSG_TYPE_DISC_CHR_EVT + j, err := json.Marshal(r) if err != nil { return nil, err @@ -179,9 +181,7 @@ func discAllChrs(x *BleXport, bl *BleListener, r *BleDiscAllChrsReq) ( case *BleDiscAllChrsRsp: bl.Acked = true if msg.Status != 0 { - return nil, StatusError(MSG_OP_RSP, - MSG_TYPE_DISC_ALL_CHRS, - msg.Status) + return nil, StatusError(MSG_OP_RSP, rspType, msg.Status) } case *BleDiscChrEvt: @@ -191,22 +191,22 @@ func discAllChrs(x *BleXport, bl *BleListener, r *BleDiscAllChrsReq) ( case ERR_CODE_EDONE: return chrs, nil default: - return nil, StatusError(MSG_OP_EVT, - MSG_TYPE_DISC_CHR_EVT, - msg.Status) + return nil, StatusError(MSG_OP_EVT, evtType, msg.Status) } default: } case <-bl.AfterTimeout(x.RspTimeout()): - return nil, BhdTimeoutError(MSG_TYPE_DISC_ALL_CHRS) + return nil, BhdTimeoutError(rspType) } } } // Blocking. func writeCmd(x *BleXport, bl *BleListener, r *BleWriteCmdReq) error { + const rspType = MSG_TYPE_WRITE_CMD + j, err := json.Marshal(r) if err != nil { return err @@ -226,9 +226,7 @@ func writeCmd(x *BleXport, bl *BleListener, r *BleWriteCmdReq) error { case *BleWriteCmdRsp: bl.Acked = true if msg.Status != 0 { - return StatusError(MSG_OP_RSP, - MSG_TYPE_WRITE_CMD, - msg.Status) + return StatusError(MSG_OP_RSP, rspType, msg.Status) } else { return nil } @@ -237,7 +235,7 @@ func writeCmd(x *BleXport, bl *BleListener, r *BleWriteCmdReq) error { } case <-bl.AfterTimeout(x.RspTimeout()): - return BhdTimeoutError(MSG_TYPE_WRITE_CMD) + return BhdTimeoutError(rspType) } } } @@ -246,6 +244,9 @@ func writeCmd(x *BleXport, bl *BleListener, r *BleWriteCmdReq) error { func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) ( int, error) { + const rspType = MSG_TYPE_EXCHANGE_MTU + const evtType = MSG_TYPE_MTU_CHANGE_EVT + j, err := json.Marshal(r) if err != nil { return 0, err @@ -265,16 +266,12 @@ func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) ( case *BleExchangeMtuRsp: bl.Acked = true if msg.Status != 0 { - return 0, StatusError(MSG_OP_RSP, - MSG_TYPE_EXCHANGE_MTU, - msg.Status) + return 0, StatusError(MSG_OP_RSP, rspType, msg.Status) } case *BleMtuChangeEvt: if msg.Status != 0 { - return 0, StatusError(MSG_OP_EVT, - MSG_TYPE_MTU_CHANGE_EVT, - msg.Status) + return 0, StatusError(MSG_OP_EVT, evtType, msg.Status) } else { return int(msg.Mtu), nil } @@ -283,7 +280,7 @@ func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) ( } case <-bl.AfterTimeout(x.RspTimeout()): - return 0, BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU) + return 0, BhdTimeoutError(rspType) } } } @@ -291,9 +288,12 @@ func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) ( type scanSuccessFn func() type advRptFn func(r BleAdvReport) -func scan(x *BleXport, bl *BleListener, r *BleScanReq, abortChan chan struct{}, +func scan(x *BleXport, bl *BleListener, r *BleScanReq, + abortChan chan struct{}, scanSuccessCb scanSuccessFn, advRptCb advRptFn) error { + const rspType = MSG_TYPE_SCAN + j, err := json.Marshal(r) if err != nil { return err @@ -313,7 +313,7 @@ func scan(x *BleXport, bl *BleListener, r *BleScanReq, abortChan chan struct{}, case *BleScanRsp: bl.Acked = true if msg.Status != 0 { - return StatusError(MSG_OP_RSP, MSG_TYPE_SCAN, msg.Status) + return StatusError(MSG_OP_RSP, rspType, msg.Status) } else { scanSuccessCb() } @@ -329,7 +329,7 @@ func scan(x *BleXport, bl *BleListener, r *BleScanReq, abortChan chan struct{}, } case <-bl.AfterTimeout(x.RspTimeout()): - return BhdTimeoutError(MSG_TYPE_SCAN) + return BhdTimeoutError(rspType) case <-abortChan: return nil @@ -338,6 +338,8 @@ func scan(x *BleXport, bl *BleListener, r *BleScanReq, abortChan chan struct{}, } func scanCancel(x *BleXport, bl *BleListener, r *BleScanCancelReq) error { + const rspType = MSG_TYPE_SCAN_CANCEL + j, err := json.Marshal(r) if err != nil { return err @@ -357,7 +359,7 @@ func scanCancel(x *BleXport, bl *BleListener, r *BleScanCancelReq) error { case *BleScanCancelRsp: bl.Acked = true if msg.Status != 0 { - return StatusError(MSG_OP_RSP, MSG_TYPE_SCAN, msg.Status) + return StatusError(MSG_OP_RSP, rspType, msg.Status) } return nil @@ -365,7 +367,7 @@ func scanCancel(x *BleXport, bl *BleListener, r *BleScanCancelReq) error { } case <-bl.AfterTimeout(x.RspTimeout()): - return BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU) + return BhdTimeoutError(rspType) } } } @@ -373,7 +375,7 @@ func scanCancel(x *BleXport, bl *BleListener, r *BleScanCancelReq) error { func connFind(x *BleXport, bl *BleListener, r *BleConnFindReq) ( BleConnDesc, error) { - const msgType = MSG_TYPE_CONN_FIND + const rspType = MSG_TYPE_CONN_FIND j, err := json.Marshal(r) if err != nil { @@ -395,7 +397,7 @@ func connFind(x *BleXport, bl *BleListener, r *BleConnFindReq) ( bl.Acked = true if msg.Status != 0 { return BleConnDesc{}, - StatusError(MSG_OP_RSP, msgType, msg.Status) + StatusError(MSG_OP_RSP, rspType, msg.Status) } return BleDescFromConnFindRsp(msg), nil @@ -404,7 +406,7 @@ func connFind(x *BleXport, bl *BleListener, r *BleConnFindReq) ( } case <-bl.AfterTimeout(x.RspTimeout()): - return BleConnDesc{}, BhdTimeoutError(msgType) + return BleConnDesc{}, BhdTimeoutError(rspType) } } } @@ -415,6 +417,8 @@ func connFind(x *BleXport, bl *BleListener, r *BleConnFindReq) ( func genRandAddr(x *BleXport, bl *BleListener, r *BleGenRandAddrReq) ( BleAddr, error) { + const rspType = MSG_TYPE_GEN_RAND_ADDR + j, err := json.Marshal(r) if err != nil { return BleAddr{}, err @@ -432,8 +436,7 @@ func genRandAddr(x *BleXport, bl *BleListener, r *BleGenRandAddrReq) ( bl.Acked = true if msg.Status != 0 { return BleAddr{}, - StatusError(MSG_OP_RSP, MSG_TYPE_GEN_RAND_ADDR, - msg.Status) + StatusError(MSG_OP_RSP, rspType, msg.Status) } return msg.Addr, nil @@ -441,7 +444,7 @@ func genRandAddr(x *BleXport, bl *BleListener, r *BleGenRandAddrReq) ( } case <-bl.AfterTimeout(x.RspTimeout()): - return BleAddr{}, BhdTimeoutError(MSG_TYPE_GEN_RAND_ADDR) + return BleAddr{}, BhdTimeoutError(rspType) } } } @@ -450,7 +453,7 @@ func genRandAddr(x *BleXport, bl *BleListener, r *BleGenRandAddrReq) ( // when the transport is starting up, and therefore does not require the // transport to be synced. Only the transport should call this function. func setRandAddr(x *BleXport, bl *BleListener, r *BleSetRandAddrReq) error { - const msgType = MSG_TYPE_SET_RAND_ADDR + const rspType = MSG_TYPE_SET_RAND_ADDR j, err := json.Marshal(r) if err != nil { @@ -468,7 +471,7 @@ func setRandAddr(x *BleXport, bl *BleListener, r *BleSetRandAddrReq) error { case *BleSetRandAddrRsp: bl.Acked = true if msg.Status != 0 { - return StatusError(MSG_OP_RSP, msgType, msg.Status) + return StatusError(MSG_OP_RSP, rspType, msg.Status) } return nil @@ -476,7 +479,7 @@ func setRandAddr(x *BleXport, bl *BleListener, r *BleSetRandAddrReq) error { } case <-bl.AfterTimeout(x.RspTimeout()): - return BhdTimeoutError(msgType) + return BhdTimeoutError(rspType) } } } @@ -487,7 +490,7 @@ func setRandAddr(x *BleXport, bl *BleListener, r *BleSetRandAddrReq) error { func setPreferredMtu(x *BleXport, bl *BleListener, r *BleSetPreferredMtuReq) error { - const msgType = MSG_TYPE_SET_PREFERRED_MTU + const rspType = MSG_TYPE_SET_PREFERRED_MTU j, err := json.Marshal(r) if err != nil { @@ -505,7 +508,7 @@ func setPreferredMtu(x *BleXport, bl *BleListener, case *BleSetPreferredMtuRsp: bl.Acked = true if msg.Status != 0 { - return StatusError(MSG_OP_RSP, msgType, msg.Status) + return StatusError(MSG_OP_RSP, rspType, msg.Status) } return nil @@ -513,7 +516,7 @@ func setPreferredMtu(x *BleXport, bl *BleListener, } case <-bl.AfterTimeout(x.RspTimeout()): - return BhdTimeoutError(msgType) + return BhdTimeoutError(rspType) } } } http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/nmble/ble_fsm.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go index 03956dd..19f626d 100644 --- a/nmxact/nmble/ble_fsm.go +++ b/nmxact/nmble/ble_fsm.go @@ -7,6 +7,7 @@ import ( "path" "runtime" "sync" + "sync/atomic" "time" log "github.com/Sirupsen/logrus" @@ -17,7 +18,11 @@ import ( "mynewt.apache.org/newtmgr/nmxact/sesn" ) -var curId int +var nextId uint32 + +func getNextId() uint32 { + return atomic.AddUint32(&nextId, 1) - 1 +} var listenLog = &log.Logger{ Out: os.Stderr, @@ -69,31 +74,23 @@ type BleFsmParams struct { type BleFsm struct { params BleFsmParams - peerDev *BleDev - connHandle uint16 - nmpSvc *BleSvc - nmpReqChr *BleChr - nmpRspChr *BleChr - attMtu int - connChan chan error - lastStateChange time.Time - id int - curErr error - errTimer *time.Timer + peerDev *BleDev + connHandle uint16 + nmpSvc *BleSvc + nmpReqChr *BleChr + nmpRspChr *BleChr + attMtu int + connChan chan error + bls map[*BleListener]struct{} + state BleSesnState + errFunnel nmxutil.ErrFunnel + id uint32 // Protects all accesses to the FSM state variable. stateMtx sync.Mutex // Protects all accesses to the bls map. blsMtx sync.Mutex - - // Prevents the session from being opened while it is still being reset - // (cleaned up). - openMtx sync.Mutex - - // These variables must be protected by the mutex. - bls map[*BleListener]struct{} - state BleSesnState } func NewBleFsm(p BleFsmParams) *BleFsm { @@ -102,10 +99,12 @@ func NewBleFsm(p BleFsmParams) *BleFsm { bls: map[*BleListener]struct{}{}, attMtu: DFLT_ATT_MTU, - id: curId, + id: getNextId(), } - curId++ + bf.errFunnel.AccumDelay = time.Second + bf.errFunnel.LessCb = fsmErrorLess + bf.errFunnel.ProcCb = func(err error) { bf.processErr(err) } return bf } @@ -118,9 +117,8 @@ func (bf *BleFsm) disconnectError(reason int) error { } func (bf *BleFsm) closedError(msg string) error { - return nmxutil.NewSesnClosedError(fmt.Sprintf( - "%s; state=%d last-state-change=%s", - msg, bf.getState(), bf.lastStateChange)) + return nmxutil.NewSesnClosedError( + fmt.Sprintf("%s; state=%d", msg, bf.getState())) } func (bf *BleFsm) getState() BleSesnState { @@ -130,16 +128,11 @@ func (bf *BleFsm) getState() BleSesnState { return bf.state } -func (bf *BleFsm) setStateNoLock(toState BleSesnState) { - bf.state = toState - bf.lastStateChange = time.Now() -} - func (bf *BleFsm) setState(toState BleSesnState) { bf.stateMtx.Lock() defer bf.stateMtx.Unlock() - bf.setStateNoLock(toState) + bf.state = toState } func (bf *BleFsm) addBleListener(name string, base BleMsgBase) ( @@ -218,6 +211,23 @@ func (bf *BleFsm) logConnection() { log.Debugf("BLE connection attempt succeeded; %s", desc.String()) } +func fsmErrorLess(a error, b error) bool { + aIsXport := nmxutil.IsXport(a) + bIsXport := nmxutil.IsXport(b) + aIsDisc := nmxutil.IsBleSesnDisconnect(a) + bIsDisc := nmxutil.IsBleSesnDisconnect(b) + + if aIsXport { + return false + } + + if aIsDisc { + return !bIsXport && !bIsDisc + } + + return false +} + func calcDisconnectType(state BleSesnState) BleFsmDisconnectType { switch state { case SESN_STATE_EXCHANGE_MTU: @@ -238,14 +248,15 @@ func (bf *BleFsm) errorAll(err error) { bf.blsMtx.Lock() defer bf.blsMtx.Unlock() - for bl, _ := range bf.bls { + bls := bf.bls + bf.bls = map[*BleListener]struct{}{} + + for bl, _ := range bls { bl.ErrChan <- err } - - bf.bls = map[*BleListener]struct{}{} } -func (bf *BleFsm) processErr() { +func (bf *BleFsm) processErr(err error) { // Remember some fields before we clear them. dt := calcDisconnectType(bf.state) @@ -254,67 +265,15 @@ func (bf *BleFsm) processErr() { peer = *bf.peerDev } - err := bf.curErr - bf.reset(err) - - bf.openMtx.Unlock() - - bf.params.DisconnectCb(dt, peer, err) -} - -func (bf *BleFsm) onError(err error) { - if bf.curErr == nil { - // Subsequent start attempts will block until the reset is complete. - bf.openMtx.Lock() - - bf.curErr = err - bf.errTimer = time.AfterFunc(time.Second, func() { - bf.processErr() - }) - } else { - var replace bool - if nmxutil.IsXport(err) { - replace = true - } else if !nmxutil.IsXport(bf.curErr) && - nmxutil.IsBleSesnDisconnect(err) { - - replace = true - } else if !nmxutil.IsXport(bf.curErr) && - !nmxutil.IsBleSesnDisconnect(bf.curErr) { - - replace = true - } else { - replace = false - } - - if replace { - if !bf.errTimer.Stop() { - <-bf.errTimer.C - } - bf.curErr = err - bf.errTimer.Reset(time.Second) - } - } -} - -func (bf *BleFsm) reset(err error) { bf.errorAll(err) bf.stateMtx.Lock() - defer bf.stateMtx.Unlock() + bf.state = SESN_STATE_UNCONNECTED + bf.stateMtx.Unlock() - bf.setStateNoLock(SESN_STATE_UNCONNECTED) bf.peerDev = nil - bf.curErr = nil -} -// Blocks until the current reset is complete. If there is no reset in -// progress, this function returns immediately. The purpose of this function -// is to prevent the client from opening the session while it is still being -// closed. -func (bf *BleFsm) blockUntilReset() { - bf.openMtx.Lock() - bf.openMtx.Unlock() + go bf.params.DisconnectCb(dt, peer, err) } func (bf *BleFsm) connectListen(seq BleSeq) error { @@ -326,15 +285,12 @@ func (bf *BleFsm) connectListen(seq BleSeq) error { } go func() { - defer func() { - bf.removeBleSeqListener("connect", seq) - }() + defer bf.removeBleSeqListener("connect", seq) for { select { case err := <-bl.ErrChan: - // Transport reported error. Assume the connection has - // dropped. - bf.onError(err) + bf.connChan <- err + bf.errFunnel.Insert(err) return case bm := <-bl.BleChan: @@ -347,7 +303,9 @@ func (bf *BleFsm) connectListen(seq BleSeq) error { ErrCodeToString(msg.Status), msg.Status, bf.peerDev.String()) log.Debugf(str) - bf.connChan <- nmxutil.NewBleHostError(msg.Status, str) + err := nmxutil.NewBleHostError(msg.Status, str) + bf.connChan <- err + bf.errFunnel.Insert(err) return } else { bf.connChan <- nil @@ -360,6 +318,7 @@ func (bf *BleFsm) connectListen(seq BleSeq) error { bf.logConnection() if err := bf.nmpRspListen(); err != nil { bf.connChan <- err + bf.errFunnel.Insert(err) return } bf.connChan <- nil @@ -369,7 +328,9 @@ func (bf *BleFsm) connectListen(seq BleSeq) error { ErrCodeToString(msg.Status), msg.Status, bf.peerDev.String()) log.Debugf(str) - bf.connChan <- nmxutil.NewBleHostError(msg.Status, str) + err := nmxutil.NewBleHostError(msg.Status, str) + bf.connChan <- err + bf.errFunnel.Insert(err) return } @@ -387,14 +348,16 @@ func (bf *BleFsm) connectListen(seq BleSeq) error { case *BleDisconnectEvt: err := bf.disconnectError(msg.Reason) - bf.onError(err) + bf.errFunnel.Insert(err) return default: } case <-bl.AfterTimeout(bf.params.Bx.RspTimeout()): - bf.connChan <- BhdTimeoutError(MSG_TYPE_CONNECT) + err := BhdTimeoutError(MSG_TYPE_CONNECT) + bf.connChan <- err + bf.errFunnel.Insert(err) } } }() @@ -415,13 +378,13 @@ func (bf *BleFsm) nmpRspListen() error { } go func() { - defer func() { - bf.removeBleBaseListener("nmp-rsp", base) - }() + defer bf.removeBleBaseListener("nmp-rsp", base) for { select { case <-bl.ErrChan: - // The session encountered an error; stop listening. + if err != nil { + bf.errFunnel.Insert(err) + } return case bm := <-bl.BleChan: switch msg := bm.(type) { @@ -446,6 +409,7 @@ func (bf *BleFsm) connect() error { r.PeerAddrType = bf.peerDev.AddrType r.PeerAddr = bf.peerDev.Addr + // Initiating a connection requires dedicated master privileges. if err := bf.params.Bx.AcquireMaster(); err != nil { return err } @@ -455,11 +419,17 @@ func (bf *BleFsm) connect() error { return err } + // Tell blehostd to initiate connection. if err := connect(bf.params.Bx, bf.connChan, r); err != nil { - bf.params.Bx.ReleaseMaster() + bhe := nmxutil.ToBleHost(err) + if bhe != nil && bhe.Status == ERR_CODE_EDONE { + // Already connected. + return nil + } return err } + // Connection operation now in progress. bf.state = SESN_STATE_CONNECTING err := <-bf.connChan @@ -488,6 +458,7 @@ func (bf *BleFsm) scan() error { r.Passive = false r.FilterDuplicates = true + // Scanning requires dedicated master privileges. if err := bf.params.Bx.AcquireMaster(); err != nil { return err } @@ -497,9 +468,7 @@ func (bf *BleFsm) scan() error { if err != nil { return err } - defer func() { - bf.removeBleSeqListener("scan", r.Seq) - }() + defer bf.removeBleSeqListener("scan", r.Seq) abortChan := make(chan struct{}, 1) @@ -539,9 +508,7 @@ func (bf *BleFsm) scanCancel() error { if err != nil { return err } - defer func() { - bf.removeBleSeqListener("scan-cancel", r.Seq) - }() + defer bf.removeBleSeqListener("scan-cancel", r.Seq) if err := scanCancel(bf.params.Bx, bl, r); err != nil { return err @@ -563,7 +530,7 @@ func (bf *BleFsm) terminateSetState() error { return fmt.Errorf( "BLE terminate failed; session already being closed") default: - bf.setStateNoLock(SESN_STATE_TERMINATING) + bf.state = SESN_STATE_TERMINATING } return nil @@ -582,9 +549,7 @@ func (bf *BleFsm) terminate() error { if err != nil { return err } - defer func() { - bf.removeBleSeqListener("terminate", r.Seq) - }() + defer bf.removeBleSeqListener("terminate", r.Seq) if err := terminate(bf.params.Bx, bl, r); err != nil { return err @@ -600,9 +565,7 @@ func (bf *BleFsm) connCancel() error { if err != nil { return err } - defer func() { - bf.removeBleSeqListener("conn-cancel", r.Seq) - }() + defer bf.removeBleSeqListener("conn-cancel", r.Seq) if err := connCancel(bf.params.Bx, bl, r); err != nil { return err @@ -620,9 +583,7 @@ func (bf *BleFsm) discSvcUuid() error { if err != nil { return err } - defer func() { - bf.removeBleSeqListener("disc-svc-uuid", r.Seq) - }() + defer bf.removeBleSeqListener("disc-svc-uuid", r.Seq) bf.nmpSvc, err = discSvcUuid(bf.params.Bx, bl, r) if err != nil { @@ -642,9 +603,7 @@ func (bf *BleFsm) discAllChrs() error { if err != nil { return err } - defer func() { - bf.removeBleSeqListener("disc-all-chrs", r.Seq) - }() + defer bf.removeBleSeqListener("disc-all-chrs", r.Seq) chrs, err := discAllChrs(bf.params.Bx, bl, r) if err != nil { @@ -683,9 +642,7 @@ func (bf *BleFsm) exchangeMtu() error { if err != nil { return err } - defer func() { - bf.removeBleSeqListener("exchange-mtu", r.Seq) - }() + defer bf.removeBleSeqListener("exchange-mtu", r.Seq) mtu, err := exchangeMtu(bf.params.Bx, bl, r) if err != nil { @@ -706,9 +663,7 @@ func (bf *BleFsm) writeCmd(data []byte) error { if err != nil { return err } - defer func() { - bf.removeBleSeqListener("write-cmd", r.Seq) - }() + defer bf.removeBleSeqListener("write-cmd", r.Seq) if err := writeCmd(bf.params.Bx, bl, r); err != nil { return err @@ -727,9 +682,7 @@ func (bf *BleFsm) subscribe() error { if err != nil { return err } - defer func() { - bf.removeBleSeqListener("subscribe", r.Seq) - }() + defer bf.removeBleSeqListener("subscribe", r.Seq) if err := writeCmd(bf.params.Bx, bl, r); err != nil { return err @@ -820,7 +773,7 @@ func (bf *BleFsm) executeState() (bool, error) { } func (bf *BleFsm) startOnce() (bool, error) { - bf.blockUntilReset() + bf.errFunnel.BlockUntilReset() if !bf.IsClosed() { return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf( @@ -831,7 +784,7 @@ func (bf *BleFsm) startOnce() (bool, error) { for { retry, err := bf.executeState() if err != nil { - bf.onError(err) + bf.errFunnel.Insert(err) return retry, err } else if bf.getState() == SESN_STATE_DONE { return false, nil @@ -870,7 +823,7 @@ func (bf *BleFsm) Stop() (bool, error) { bf.closedError("Attempt to close an unopened BLE session") case SESN_STATE_CONNECTING: - bf.onError(fmt.Errorf("Connection attempt cancelled")) + bf.errFunnel.Insert(fmt.Errorf("Connection attempt cancelled")) return false, nil default: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/nmble/ble_proto.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_proto.go b/nmxact/nmble/ble_proto.go index 6a99feb..a43348d 100644 --- a/nmxact/nmble/ble_proto.go +++ b/nmxact/nmble/ble_proto.go @@ -793,7 +793,7 @@ func ErrCodeToString(e int) string { func MsgOpToString(op MsgOp) string { s := MsgOpStringMap[op] if s == "" { - panic(fmt.Sprintf("Invalid MsgOp: %d", int(op))) + return "???" } return s @@ -812,7 +812,7 @@ func MsgOpFromString(s string) (MsgOp, error) { func MsgTypeToString(msgType MsgType) string { s := MsgTypeStringMap[msgType] if s == "" { - panic(fmt.Sprintf("Invalid MsgType: %d", int(msgType))) + return "???" } return s http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/nmble/ble_xport.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go index d87f0c8..d78a000 100644 --- a/nmxact/nmble/ble_xport.go +++ b/nmxact/nmble/ble_xport.go @@ -251,6 +251,7 @@ func (bx *BleXport) shutdown(restart bool, err error) { bx.client.Stop() } + // Indicate error to all clients who are waiting for the master resource. bx.master.Abort(err) // Indicate an error to all of this transport's listeners. This prevents http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/nmxutil/nmxutil.go ---------------------------------------------------------------------- diff --git a/nmxact/nmxutil/nmxutil.go b/nmxact/nmxutil/nmxutil.go index cb299e9..e63a7e2 100644 --- a/nmxact/nmxutil/nmxutil.go +++ b/nmxact/nmxutil/nmxutil.go @@ -3,6 +3,7 @@ package nmxutil import ( "math/rand" "sync" + "time" ) var nextNmpSeq uint8 @@ -100,3 +101,79 @@ func (s *SingleResource) Abort(err error) { w <- err } } + +type ErrLessFn func(a error, b error) bool +type ErrProcFn func(err error) + +// Aggregates errors that occur close in time. The most severe error gets +// reported. +type ErrFunnel struct { + LessCb ErrLessFn + ProcCb ErrProcFn + AccumDelay time.Duration + + mtx sync.Mutex + resetMtx sync.Mutex + curErr error + errTimer *time.Timer +} + +func (f *ErrFunnel) Insert(err error) { + if err == nil { + panic("ErrFunnel nil insert") + } + + f.mtx.Lock() + defer f.mtx.Unlock() + + if f.curErr == nil { + // Subsequent use attempts will block until the funnel is inactive. + f.resetMtx.Lock() + + f.curErr = err + f.errTimer = time.AfterFunc(f.AccumDelay, func() { + f.timerExp() + }) + } else { + if f.LessCb(f.curErr, err) { + if !f.errTimer.Stop() { + <-f.errTimer.C + } + f.curErr = err + f.errTimer.Reset(f.AccumDelay) + } + } +} + +func (f *ErrFunnel) resetNoLock() { + if f.curErr != nil { + f.curErr = nil + f.errTimer.Stop() + f.resetMtx.Unlock() + } +} + +func (f *ErrFunnel) Reset() { + f.mtx.Lock() + defer f.mtx.Unlock() + + f.resetNoLock() +} + +func (f *ErrFunnel) BlockUntilReset() { + f.resetMtx.Lock() + f.resetMtx.Unlock() +} + +func (f *ErrFunnel) timerExp() { + f.mtx.Lock() + defer f.mtx.Unlock() + + if f.curErr == nil { + panic("ErrFunnel timer expired but no error") + } + + f.ProcCb(f.curErr) + + f.resetNoLock() +} http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/e5dcf07e/nmxact/sesn/sesn_cfg.go ---------------------------------------------------------------------- diff --git a/nmxact/sesn/sesn_cfg.go b/nmxact/sesn/sesn_cfg.go index 3c57f24..60cdc81 100644 --- a/nmxact/sesn/sesn_cfg.go +++ b/nmxact/sesn/sesn_cfg.go @@ -61,7 +61,7 @@ func NewSesnCfg() SesnCfg { return SesnCfg{ Ble: SesnCfgBle{ ConnTries: 3, - CloseTimeout: 15 * time.Second, + CloseTimeout: 30 * time.Second, }, } }
