nmxact - Thread-safety fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/6c9269d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/6c9269d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/6c9269d7 Branch: refs/heads/master Commit: 6c9269d72f52bcb0f7b474cc18fe18d5d9934b3d Parents: 744cb38 Author: Christopher Collins <[email protected]> Authored: Sun Apr 9 14:26:33 2017 -0700 Committer: Christopher Collins <[email protected]> Committed: Mon Apr 10 15:13:46 2017 -0700 ---------------------------------------------------------------------- nmxact/example/ble_dual/ble_dual.go | 2 +- nmxact/nmble/ble_act.go | 16 +- nmxact/nmble/ble_fsm.go | 583 ++++++++++++++++++------------- nmxact/nmble/ble_oic_sesn.go | 26 +- nmxact/nmble/ble_plain_sesn.go | 41 +-- nmxact/nmble/ble_proto.go | 11 +- nmxact/nmble/ble_util.go | 2 +- nmxact/nmble/ble_xport.go | 57 +-- nmxact/nmble/dispatch.go | 6 +- nmxact/nmserial/serial_xport.go | 4 +- nmxact/nmxutil/nmxerr.go | 32 +- nmxact/nmxutil/nmxutil.go | 77 ++++ nmxact/sesn/sesn.go | 4 +- nmxact/sesn/sesn_cfg.go | 2 +- 14 files changed, 477 insertions(+), 386 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/example/ble_dual/ble_dual.go ---------------------------------------------------------------------- diff --git a/nmxact/example/ble_dual/ble_dual.go b/nmxact/example/ble_dual/ble_dual.go index a69b8d5..bedff95 100644 --- a/nmxact/example/ble_dual/ble_dual.go +++ b/nmxact/example/ble_dual/ble_dual.go @@ -103,7 +103,7 @@ func main() { params := nmble.NewXportCfg() params.SockPath = "/tmp/blehostd-uds" params.BlehostdPath = "blehostd.elf" - params.DevPath = "/dev/cu.usbmodem142111" + params.DevPath = "/dev/cu.usbmodem141121" x, err := nmble.NewBleXport(params) if err != nil { http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_act.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go index 90ac53e..e5e336c 100644 --- a/nmxact/nmble/ble_act.go +++ b/nmxact/nmble/ble_act.go @@ -288,10 +288,11 @@ func exchangeMtu(x *BleXport, bl *BleListener, r *BleExchangeMtuReq) ( } } -type scanFn func(r BleAdvReport) +type scanSuccessFn func() +type advRptFn func(r BleAdvReport) -func scan(x *BleXport, bl *BleListener, r *BleScanReq, - abortChan chan struct{}, scanCb scanFn) error { +func scan(x *BleXport, bl *BleListener, r *BleScanReq, abortChan chan struct{}, + scanSuccessCb scanSuccessFn, advRptCb advRptFn) error { j, err := json.Marshal(r) if err != nil { @@ -313,17 +314,22 @@ func scan(x *BleXport, bl *BleListener, r *BleScanReq, bl.Acked = true if msg.Status != 0 { return StatusError(MSG_OP_RSP, MSG_TYPE_SCAN, msg.Status) + } else { + scanSuccessCb() } case *BleScanEvt: r := BleAdvReportFromScanEvt(msg) - scanCb(r) + advRptCb(r) + + case *BleScanTmoEvt: + return nmxutil.NewScanTmoError("scan duration expired") default: } case <-bl.AfterTimeout(x.RspTimeout()): - return BhdTimeoutError(MSG_TYPE_EXCHANGE_MTU) + return BhdTimeoutError(MSG_TYPE_SCAN) case <-abortChan: return nil http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_fsm.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go index d198844..03956dd 100644 --- a/nmxact/nmble/ble_fsm.go +++ b/nmxact/nmble/ble_fsm.go @@ -3,6 +3,9 @@ package nmble import ( "encoding/hex" "fmt" + "os" + "path" + "runtime" "sync" "time" @@ -14,6 +17,14 @@ import ( "mynewt.apache.org/newtmgr/nmxact/sesn" ) +var curId int + +var listenLog = &log.Logger{ + Out: os.Stderr, + Formatter: new(log.TextFormatter), + Level: log.InfoLevel, +} + const DFLT_ATT_MTU = 23 type BleSesnState int32 @@ -22,15 +33,13 @@ const ( SESN_STATE_UNCONNECTED BleSesnState = 0 SESN_STATE_SCANNING = 1 SESN_STATE_CONNECTING = 2 - SESN_STATE_CONNECTED = 3 - SESN_STATE_EXCHANGING_MTU = 4 - SESN_STATE_EXCHANGED_MTU = 5 - SESN_STATE_DISCOVERING_SVC = 6 - SESN_STATE_DISCOVERED_SVC = 7 - SESN_STATE_DISCOVERING_CHR = 8 - SESN_STATE_DISCOVERED_CHR = 9 - SESN_STATE_TERMINATING = 10 - SESN_STATE_CONN_CANCELLING = 11 + SESN_STATE_EXCHANGE_MTU = 3 + SESN_STATE_DISCOVER_SVC = 4 + SESN_STATE_DISCOVER_CHR = 5 + SESN_STATE_SUBSCRIBE = 6 + SESN_STATE_DONE = 7 + SESN_STATE_TERMINATING = 8 + SESN_STATE_CONN_CANCELLING = 9 ) type BleFsmDisconnectType int @@ -49,6 +58,7 @@ type BleFsmParams struct { Bx *BleXport OwnAddrType BleAddrType PeerSpec sesn.BlePeerSpec + ConnTries int SvcUuid BleUuid ReqChrUuid BleUuid RspChrUuid BleUuid @@ -66,8 +76,20 @@ type BleFsm struct { nmpRspChr *BleChr attMtu int connChan chan error - mtx sync.Mutex lastStateChange time.Time + id int + curErr error + errTimer *time.Timer + + // Protects all accesses to the FSM state variable. + stateMtx sync.Mutex + + // Protects all accesses to the bls map. + blsMtx sync.Mutex + + // Prevents the session from being opened while it is still being reset + // (cleaned up). + openMtx sync.Mutex // These variables must be protected by the mutex. bls map[*BleListener]struct{} @@ -80,8 +102,11 @@ func NewBleFsm(p BleFsmParams) *BleFsm { bls: map[*BleListener]struct{}{}, attMtu: DFLT_ATT_MTU, + id: curId, } + curId++ + return bf } @@ -99,63 +124,36 @@ func (bf *BleFsm) closedError(msg string) error { } func (bf *BleFsm) getState() BleSesnState { - bf.mtx.Lock() - defer bf.mtx.Unlock() + bf.stateMtx.Lock() + defer bf.stateMtx.Unlock() return bf.state } -func stateRequiresMaster(s BleSesnState) bool { - return s == SESN_STATE_SCANNING || s == SESN_STATE_CONNECTING -} - -func (bf *BleFsm) setStateNoLock(toState BleSesnState) error { - if !stateRequiresMaster(bf.state) && stateRequiresMaster(toState) { - if err := bf.params.Bx.AcquireMaster(); err != nil { - return err - } - } else if stateRequiresMaster(bf.state) && !stateRequiresMaster(toState) { - bf.params.Bx.ReleaseMaster() - } - +func (bf *BleFsm) setStateNoLock(toState BleSesnState) { bf.state = toState bf.lastStateChange = time.Now() - - return nil } -func (bf *BleFsm) setState(toState BleSesnState) error { - bf.mtx.Lock() - defer bf.mtx.Unlock() +func (bf *BleFsm) setState(toState BleSesnState) { + bf.stateMtx.Lock() + defer bf.stateMtx.Unlock() - return bf.setStateNoLock(toState) + bf.setStateNoLock(toState) } -func (bf *BleFsm) transitionState(fromState BleSesnState, - toState BleSesnState) error { +func (bf *BleFsm) addBleListener(name string, base BleMsgBase) ( + *BleListener, error) { - bf.mtx.Lock() - defer bf.mtx.Unlock() + _, file, line, _ := runtime.Caller(2) + file = path.Base(file) + listenLog.Debugf("[%d] {add-listener} [%s:%d] %s: base=%+v", + bf.id, file, line, name, base) - if bf.state != fromState { - return fmt.Errorf( - "Can't set BleFsm state to %d; current state != required "+ - "value: %d", - toState, fromState) - } - - if err := bf.setStateNoLock(toState); err != nil { - return err - } - - return nil -} - -func (bf *BleFsm) addBleListener(base BleMsgBase) (*BleListener, error) { bl := NewBleListener() - bf.mtx.Lock() - defer bf.mtx.Unlock() + bf.blsMtx.Lock() + defer bf.blsMtx.Unlock() if err := bf.params.Bx.Bd.AddListener(base, bl); err != nil { return nil, err @@ -165,32 +163,42 @@ func (bf *BleFsm) addBleListener(base BleMsgBase) (*BleListener, error) { return bl, nil } -func (bf *BleFsm) addBleSeqListener(seq BleSeq) (*BleListener, error) { +func (bf *BleFsm) addBleBaseListener(name string, base BleMsgBase) ( + *BleListener, error) { + + return bf.addBleListener(name, base) +} + +func (bf *BleFsm) addBleSeqListener(name string, seq BleSeq) ( + *BleListener, error) { + base := BleMsgBase{ Op: -1, Type: -1, Seq: seq, ConnHandle: -1, } - bl, err := bf.addBleListener(base) - if err != nil { - return nil, err - } - - return bl, nil + return bf.addBleListener(name, base) } -func (bf *BleFsm) removeBleListener(base BleMsgBase) { - bf.mtx.Lock() - defer bf.mtx.Unlock() +func (bf *BleFsm) removeBleListener(name string, base BleMsgBase) { + _, file, line, _ := runtime.Caller(2) + file = path.Base(file) + listenLog.Debugf("[%d] {remove-listener} [%s:%d] %s: base=%+v", + bf.id, file, line, name, base) + + bf.blsMtx.Lock() + defer bf.blsMtx.Unlock() bl := bf.params.Bx.Bd.RemoveListener(base) - if bl != nil { - delete(bf.bls, bl) - } + delete(bf.bls, bl) } -func (bf *BleFsm) removeBleSeqListener(seq BleSeq) { +func (bf *BleFsm) removeBleBaseListener(name string, base BleMsgBase) { + bf.removeBleListener(name, base) +} + +func (bf *BleFsm) removeBleSeqListener(name string, seq BleSeq) { base := BleMsgBase{ Op: -1, Type: -1, @@ -198,30 +206,7 @@ func (bf *BleFsm) removeBleSeqListener(seq BleSeq) { ConnHandle: -1, } - bf.removeBleListener(base) -} - -func (bf *BleFsm) action( - preState BleSesnState, - inState BleSesnState, - postState BleSesnState, - cb func() error) error { - - if err := bf.transitionState(preState, inState); err != nil { - return err - } - - if err := cb(); err != nil { - if err := bf.setState(preState); err != nil { - return err - } - return err - } - - if err := bf.setState(postState); err != nil { - return err - } - return nil + bf.removeBleListener(name, base) } func (bf *BleFsm) logConnection() { @@ -235,10 +220,10 @@ func (bf *BleFsm) logConnection() { func calcDisconnectType(state BleSesnState) BleFsmDisconnectType { switch state { - case SESN_STATE_EXCHANGING_MTU: + case SESN_STATE_EXCHANGE_MTU: return FSM_DISCONNECT_TYPE_IMMEDIATE_TIMEOUT - case SESN_STATE_DISCOVERED_CHR: + case SESN_STATE_DONE: return FSM_DISCONNECT_TYPE_OPENED case SESN_STATE_TERMINATING, SESN_STATE_CONN_CANCELLING: @@ -249,56 +234,107 @@ func calcDisconnectType(state BleSesnState) BleFsmDisconnectType { } } -func (bf *BleFsm) resetState(err error) { - bf.mtx.Lock() +func (bf *BleFsm) errorAll(err error) { + bf.blsMtx.Lock() + defer bf.blsMtx.Unlock() - if err := bf.setStateNoLock(SESN_STATE_UNCONNECTED); err != nil { - // Change to unconnected state should never fail. - panic(fmt.Sprintf( - "BleFsm state change resulted in unexpected error: %s", err)) - } - bf.peerDev = nil - - // Make a copy of all the listeners so we don't have to keep the mutex - // locked while we send error signals to them. - bls := make([]*BleListener, 0, len(bf.bls)) for bl, _ := range bf.bls { - bls = append(bls, bl) - } - - bf.mtx.Unlock() - - for _, bl := range bls { bl.ErrChan <- err } + + bf.bls = map[*BleListener]struct{}{} } -func (bf *BleFsm) onDisconnect(err error) { +func (bf *BleFsm) processErr() { // Remember some fields before we clear them. dt := calcDisconnectType(bf.state) - peer := *bf.peerDev - bf.resetState(err) + var peer BleDev + if bf.peerDev != nil { + peer = *bf.peerDev + } + + err := bf.curErr + bf.reset(err) + + bf.openMtx.Unlock() bf.params.DisconnectCb(dt, peer, err) } +func (bf *BleFsm) onError(err error) { + if bf.curErr == nil { + // Subsequent start attempts will block until the reset is complete. + bf.openMtx.Lock() + + bf.curErr = err + bf.errTimer = time.AfterFunc(time.Second, func() { + bf.processErr() + }) + } else { + var replace bool + if nmxutil.IsXport(err) { + replace = true + } else if !nmxutil.IsXport(bf.curErr) && + nmxutil.IsBleSesnDisconnect(err) { + + replace = true + } else if !nmxutil.IsXport(bf.curErr) && + !nmxutil.IsBleSesnDisconnect(bf.curErr) { + + replace = true + } else { + replace = false + } + + if replace { + if !bf.errTimer.Stop() { + <-bf.errTimer.C + } + bf.curErr = err + bf.errTimer.Reset(time.Second) + } + } +} + +func (bf *BleFsm) reset(err error) { + bf.errorAll(err) + + bf.stateMtx.Lock() + defer bf.stateMtx.Unlock() + + bf.setStateNoLock(SESN_STATE_UNCONNECTED) + bf.peerDev = nil + bf.curErr = nil +} + +// Blocks until the current reset is complete. If there is no reset in +// progress, this function returns immediately. The purpose of this function +// is to prevent the client from opening the session while it is still being +// closed. +func (bf *BleFsm) blockUntilReset() { + bf.openMtx.Lock() + bf.openMtx.Unlock() +} + func (bf *BleFsm) connectListen(seq BleSeq) error { bf.connChan = make(chan error, 1) - bl, err := bf.addBleSeqListener(seq) + bl, err := bf.addBleSeqListener("connect", seq) if err != nil { return err } go func() { - defer bf.removeBleSeqListener(seq) + defer func() { + bf.removeBleSeqListener("connect", seq) + }() for { select { case err := <-bl.ErrChan: // Transport reported error. Assume the connection has // dropped. - bf.onDisconnect(err) + bf.onError(err) return case bm := <-bl.BleChan: @@ -313,6 +349,8 @@ func (bf *BleFsm) connectListen(seq BleSeq) error { log.Debugf(str) bf.connChan <- nmxutil.NewBleHostError(msg.Status, str) return + } else { + bf.connChan <- nil } case *BleConnectEvt: @@ -349,7 +387,7 @@ func (bf *BleFsm) connectListen(seq BleSeq) error { case *BleDisconnectEvt: err := bf.disconnectError(msg.Reason) - bf.onDisconnect(err) + bf.onError(err) return default: @@ -371,13 +409,15 @@ func (bf *BleFsm) nmpRspListen() error { ConnHandle: int(bf.connHandle), } - bl, err := bf.addBleListener(base) + bl, err := bf.addBleBaseListener("nmp-rsp", base) if err != nil { return err } go func() { - defer bf.removeBleListener(base) + defer func() { + bf.removeBleBaseListener("nmp-rsp", base) + }() for { select { case <-bl.ErrChan: @@ -406,15 +446,37 @@ func (bf *BleFsm) connect() error { r.PeerAddrType = bf.peerDev.AddrType r.PeerAddr = bf.peerDev.Addr + if err := bf.params.Bx.AcquireMaster(); err != nil { + return err + } + defer bf.params.Bx.ReleaseMaster() + if err := bf.connectListen(r.Seq); err != nil { return err } if err := connect(bf.params.Bx, bf.connChan, r); err != nil { + bf.params.Bx.ReleaseMaster() return err } - return nil + bf.state = SESN_STATE_CONNECTING + + err := <-bf.connChan + if !nmxutil.IsXport(err) { + // The transport did not restart; always attempt to cancel the connect + // operation. In most cases, the host has already stopped connecting + // and will respond with an "ealready" error that can be ignored. + if err := bf.connCancel(); err != nil { + bhe := nmxutil.ToBleHost(err) + if bhe == nil || bhe.Status != ERR_CODE_EALREADY { + log.Errorf("Failed to cancel connect in progress: %s", + err.Error()) + } + } + } + + return err } func (bf *BleFsm) scan() error { @@ -426,16 +488,26 @@ func (bf *BleFsm) scan() error { r.Passive = false r.FilterDuplicates = true - bl, err := bf.addBleSeqListener(r.Seq) + if err := bf.params.Bx.AcquireMaster(); err != nil { + return err + } + defer bf.params.Bx.ReleaseMaster() + + bl, err := bf.addBleSeqListener("scan", r.Seq) if err != nil { return err } - defer bf.removeBleSeqListener(r.Seq) + defer func() { + bf.removeBleSeqListener("scan", r.Seq) + }() abortChan := make(chan struct{}, 1) + // This function gets called when scanning begins. + scanSuccessCb := func() { bf.state = SESN_STATE_SCANNING } + // This function gets called for each incoming advertisement. - scanCb := func(r BleAdvReport) { + advRptCb := func(r BleAdvReport) { // Ask client if we should connect to this advertiser. if bf.params.PeerSpec.ScanPred(r) { bf.peerDev = &r.Sender @@ -443,22 +515,33 @@ func (bf *BleFsm) scan() error { } } - if err := scan(bf.params.Bx, bl, r, abortChan, scanCb); err != nil { - return err + err = scan(bf.params.Bx, bl, r, abortChan, scanSuccessCb, advRptCb) + if !nmxutil.IsXport(err) { + // The transport did not restart; always attempt to cancel the scan + // operation. In most cases, the host has already stopped scanning + // and will respond with an "ealready" error that can be ignored. + if err := bf.scanCancel(); err != nil { + bhe := nmxutil.ToBleHost(err) + if bhe == nil || bhe.Status != ERR_CODE_EALREADY { + log.Errorf("Failed to cancel scan in progress: %s", + err.Error()) + } + } } - // Scanning still in progress; cancel the operation. - return bf.scanCancel() + return err } func (bf *BleFsm) scanCancel() error { r := NewBleScanCancelReq() - bl, err := bf.addBleSeqListener(r.Seq) + bl, err := bf.addBleSeqListener("scan-cancel", r.Seq) if err != nil { return err } - defer bf.removeBleSeqListener(r.Seq) + defer func() { + bf.removeBleSeqListener("scan-cancel", r.Seq) + }() if err := scanCancel(bf.params.Bx, bl, r); err != nil { return err @@ -468,8 +551,8 @@ func (bf *BleFsm) scanCancel() error { } func (bf *BleFsm) terminateSetState() error { - bf.mtx.Lock() - defer bf.mtx.Unlock() + bf.stateMtx.Lock() + defer bf.stateMtx.Unlock() switch bf.state { case SESN_STATE_UNCONNECTED, @@ -480,11 +563,7 @@ func (bf *BleFsm) terminateSetState() error { return fmt.Errorf( "BLE terminate failed; session already being closed") default: - if err := bf.setStateNoLock(SESN_STATE_TERMINATING); err != nil { - // Change to terminating state should never fail. - panic(fmt.Sprintf( - "BleFsm state change resulted in unexpected error: %s", err)) - } + bf.setStateNoLock(SESN_STATE_TERMINATING) } return nil @@ -499,11 +578,13 @@ func (bf *BleFsm) terminate() error { r.ConnHandle = bf.connHandle r.HciReason = ERR_CODE_HCI_REM_USER_CONN_TERM - bl, err := bf.addBleSeqListener(r.Seq) + bl, err := bf.addBleSeqListener("terminate", r.Seq) if err != nil { return err } - defer bf.removeBleSeqListener(r.Seq) + defer func() { + bf.removeBleSeqListener("terminate", r.Seq) + }() if err := terminate(bf.params.Bx, bl, r); err != nil { return err @@ -513,19 +594,15 @@ func (bf *BleFsm) terminate() error { } func (bf *BleFsm) connCancel() error { - if err := bf.transitionState( - SESN_STATE_CONNECTING, - SESN_STATE_CONN_CANCELLING); err != nil { - - return fmt.Errorf("BLE connect cancel failed; not connecting") - } - r := NewBleConnCancelReq() - bl, err := bf.addBleSeqListener(r.Seq) + + bl, err := bf.addBleSeqListener("conn-cancel", r.Seq) if err != nil { return err } - defer bf.removeBleSeqListener(r.Seq) + defer func() { + bf.removeBleSeqListener("conn-cancel", r.Seq) + }() if err := connCancel(bf.params.Bx, bl, r); err != nil { return err @@ -539,11 +616,13 @@ func (bf *BleFsm) discSvcUuid() error { r.ConnHandle = bf.connHandle r.Uuid = bf.params.SvcUuid - bl, err := bf.addBleSeqListener(r.Seq) + bl, err := bf.addBleSeqListener("disc-svc-uuid", r.Seq) if err != nil { return err } - defer bf.removeBleSeqListener(r.Seq) + defer func() { + bf.removeBleSeqListener("disc-svc-uuid", r.Seq) + }() bf.nmpSvc, err = discSvcUuid(bf.params.Bx, bl, r) if err != nil { @@ -559,11 +638,13 @@ func (bf *BleFsm) discAllChrs() error { r.StartHandle = bf.nmpSvc.StartHandle r.EndHandle = bf.nmpSvc.EndHandle - bl, err := bf.addBleSeqListener(r.Seq) + bl, err := bf.addBleSeqListener("disc-all-chrs", r.Seq) if err != nil { return err } - defer bf.removeBleSeqListener(r.Seq) + defer func() { + bf.removeBleSeqListener("disc-all-chrs", r.Seq) + }() chrs, err := discAllChrs(bf.params.Bx, bl, r) if err != nil { @@ -598,11 +679,13 @@ func (bf *BleFsm) exchangeMtu() error { r := NewBleExchangeMtuReq() r.ConnHandle = bf.connHandle - bl, err := bf.addBleSeqListener(r.Seq) + bl, err := bf.addBleSeqListener("exchange-mtu", r.Seq) if err != nil { return err } - defer bf.removeBleSeqListener(r.Seq) + defer func() { + bf.removeBleSeqListener("exchange-mtu", r.Seq) + }() mtu, err := exchangeMtu(bf.params.Bx, bl, r) if err != nil { @@ -619,11 +702,13 @@ func (bf *BleFsm) writeCmd(data []byte) error { r.AttrHandle = bf.nmpReqChr.ValHandle r.Data.Bytes = data - bl, err := bf.addBleSeqListener(r.Seq) + bl, err := bf.addBleSeqListener("write-cmd", r.Seq) if err != nil { return err } - defer bf.removeBleSeqListener(r.Seq) + defer func() { + bf.removeBleSeqListener("write-cmd", r.Seq) + }() if err := writeCmd(bf.params.Bx, bl, r); err != nil { return err @@ -638,11 +723,13 @@ func (bf *BleFsm) subscribe() error { r.AttrHandle = bf.nmpRspChr.ValHandle + 1 r.Data.Bytes = []byte{1, 0} - bl, err := bf.addBleSeqListener(r.Seq) + bl, err := bf.addBleSeqListener("subscribe", r.Seq) if err != nil { return err } - defer bf.removeBleSeqListener(r.Seq) + defer func() { + bf.removeBleSeqListener("subscribe", r.Seq) + }() if err := writeCmd(bf.params.Bx, bl, r); err != nil { return err @@ -668,109 +755,105 @@ func (bf *BleFsm) tryFillPeerDev() bool { return false } -// @return bool Whether another start attempt should be made; -// error The error that caused the start attempt to -// fail; nil on success. -func (bf *BleFsm) Start() (bool, error) { - if !bf.IsClosed() { - return false, nmxutil.NewSesnAlreadyOpenError( - "Attempt to open an already-open BLE session") - } +func (bf *BleFsm) executeState() (bool, error) { + bf.stateMtx.Lock() + defer bf.stateMtx.Unlock() - for { - state := bf.getState() - switch state { - case SESN_STATE_UNCONNECTED: - var err error - - // Determine if we can immediately initiate a connection, or if we - // need to scan for a peer first. If the client specified a peer - // address, or if we have already successfully scanned, we initiate - // a connection now. Otherwise, we need to scan to determine which - // peer meets the specified scan criteria. - bf.tryFillPeerDev() - if bf.peerDev == nil { - // Peer not inferred yet. Initiate scan. - cb := func() error { return bf.scan() } - err = bf.action( - SESN_STATE_UNCONNECTED, - SESN_STATE_SCANNING, - SESN_STATE_UNCONNECTED, - cb) - } else { - // We already know the address we want to connect to. Initiate - // a connection. - cb := func() error { return bf.connect() } - err = bf.action( - SESN_STATE_UNCONNECTED, - SESN_STATE_CONNECTING, - SESN_STATE_CONNECTED, - cb) + switch bf.state { + case SESN_STATE_UNCONNECTED: + // Determine if we can immediately initiate a connection, or if we + // need to scan for a peer first. If the client specified a peer + // address, or if we have already successfully scanned, we initiate + // a connection now. Otherwise, we need to scan to determine which + // peer meets the specified scan criteria. + bf.tryFillPeerDev() + if bf.peerDev == nil { + // Peer not inferred yet. Initiate scan. + if err := bf.scan(); err != nil { + return false, err } - - if err != nil { - log.Info("[%p] FAILED FROM UNCONNECTED STATE: %s", bf, err.Error()) - bf.resetState(err) + bf.state = SESN_STATE_UNCONNECTED + } else { + // We already know the address we want to connect to. Initiate + // a connection. + if err := bf.connect(); err != nil { return false, err } + bf.state = SESN_STATE_EXCHANGE_MTU + } - case SESN_STATE_CONNECTED: - cb := func() error { return bf.exchangeMtu() } - err := bf.action( - SESN_STATE_CONNECTED, - SESN_STATE_EXCHANGING_MTU, - SESN_STATE_EXCHANGED_MTU, - cb) - if err != nil { - bhe := nmxutil.ToBleHost(err) - retry := bhe != nil && bhe.Status == ERR_CODE_ENOTCONN - bf.resetState(err) - return retry, err - } + case SESN_STATE_EXCHANGE_MTU: + if err := bf.exchangeMtu(); err != nil { + bhe := nmxutil.ToBleHost(err) + retry := bhe != nil && bhe.Status == ERR_CODE_ENOTCONN + return retry, err + } + bf.state = SESN_STATE_DISCOVER_SVC - case SESN_STATE_EXCHANGED_MTU: - cb := func() error { return bf.discSvcUuid() } - err := bf.action( - SESN_STATE_EXCHANGED_MTU, - SESN_STATE_DISCOVERING_SVC, - SESN_STATE_DISCOVERED_SVC, - cb) - if err != nil { - bf.resetState(err) - return false, err - } + case SESN_STATE_DISCOVER_SVC: + if err := bf.discSvcUuid(); err != nil { + return false, err + } + bf.state = SESN_STATE_DISCOVER_CHR - case SESN_STATE_DISCOVERED_SVC: - cb := func() error { - return bf.discAllChrs() - } + case SESN_STATE_DISCOVER_CHR: + if err := bf.discAllChrs(); err != nil { + return false, err + } + bf.state = SESN_STATE_SUBSCRIBE - err := bf.action( - SESN_STATE_DISCOVERED_SVC, - SESN_STATE_DISCOVERING_CHR, - SESN_STATE_DISCOVERED_CHR, - cb) - if err != nil { - bf.resetState(err) - return false, err - } + case SESN_STATE_SUBSCRIBE: + if err := bf.subscribe(); err != nil { + return false, err + } + bf.state = SESN_STATE_DONE - if err := bf.subscribe(); err != nil { - bf.resetState(err) - return false, err - } + case SESN_STATE_DONE: + /* Open complete. */ + return false, fmt.Errorf("BleFsm already done being opened") + + default: + return false, fmt.Errorf("BleFsm already being opened") + } + + return false, nil +} + +func (bf *BleFsm) startOnce() (bool, error) { + bf.blockUntilReset() + + if !bf.IsClosed() { + return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf( + "Attempt to open an already-open BLE session (state=%d)", + bf.getState())) + } - case SESN_STATE_DISCOVERED_CHR: - /* Open complete. */ + for { + retry, err := bf.executeState() + if err != nil { + bf.onError(err) + return retry, err + } else if bf.getState() == SESN_STATE_DONE { return false, nil + } + } +} - case SESN_STATE_CONNECTING, - SESN_STATE_DISCOVERING_SVC, - SESN_STATE_DISCOVERING_CHR, - SESN_STATE_TERMINATING: - return false, fmt.Errorf("BleFsm already being opened") +// @return bool Whether another start attempt should be made; +// error The error that caused the start attempt to +// fail; nil on success. +func (bf *BleFsm) Start() error { + var err error + + for i := 0; i < bf.params.ConnTries; i++ { + var retry bool + retry, err = bf.startOnce() + if !retry { + break } } + + return err } // @return bool true if stop complete; @@ -787,10 +870,8 @@ func (bf *BleFsm) Stop() (bool, error) { bf.closedError("Attempt to close an unopened BLE session") case SESN_STATE_CONNECTING: - if err := bf.connCancel(); err != nil { - return false, err - } - return true, nil + bf.onError(fmt.Errorf("Connection attempt cancelled")) + return false, nil default: if err := bf.terminate(); err != nil { @@ -801,7 +882,7 @@ func (bf *BleFsm) Stop() (bool, error) { } func (bf *BleFsm) IsOpen() bool { - return bf.getState() == SESN_STATE_DISCOVERED_CHR + return bf.getState() == SESN_STATE_DONE } func (bf *BleFsm) IsClosed() bool { http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_oic_sesn.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go index e5139be..0c7b052 100644 --- a/nmxact/nmble/ble_oic_sesn.go +++ b/nmxact/nmble/ble_oic_sesn.go @@ -5,8 +5,6 @@ import ( "sync" "time" - log "github.com/Sirupsen/logrus" - "mynewt.apache.org/newt/util" . "mynewt.apache.org/newtmgr/nmxact/bledefs" "mynewt.apache.org/newtmgr/nmxact/nmp" @@ -18,7 +16,6 @@ type BleOicSesn struct { bf *BleFsm nls map[*nmp.NmpListener]struct{} od *omp.OmpDispatcher - connTries int closeTimeout time.Duration onCloseCb sesn.BleOnCloseFn @@ -30,7 +27,6 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn { bos := &BleOicSesn{ nls: map[*nmp.NmpListener]struct{}{}, od: omp.NewOmpDispatcher(), - connTries: cfg.Ble.ConnTries, closeTimeout: cfg.Ble.CloseTimeout, onCloseCb: cfg.Ble.OnCloseCb, } @@ -54,6 +50,7 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn { Bx: bx, OwnAddrType: cfg.Ble.OwnAddrType, PeerSpec: cfg.Ble.PeerSpec, + ConnTries: cfg.Ble.ConnTries, SvcUuid: svcUuid, ReqChrUuid: reqChrUuid, RspChrUuid: rspChrUuid, @@ -139,26 +136,7 @@ func (bos *BleOicSesn) AbortRx(seq uint8) error { } func (bos *BleOicSesn) Open() error { - var err error - for i := 0; i < bos.connTries; i++ { - log.Debugf("Opening BLE session; try %d/%d", i+1, bos.connTries) - - var retry bool - retry, err = bos.bf.Start() - if !retry { - break - } - - if bos.blockUntilClosed(1*time.Second) != nil { - // Just close the session manually and report the original error. - bos.Close() - return err - } - - log.Debugf("Connection to BLE peer dropped immediately; retrying") - } - - return err + return bos.bf.Start() } func (bos *BleOicSesn) Close() error { http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_plain_sesn.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go index 803eb45..9e1f70d 100644 --- a/nmxact/nmble/ble_plain_sesn.go +++ b/nmxact/nmble/ble_plain_sesn.go @@ -5,8 +5,6 @@ import ( "sync" "time" - log "github.com/Sirupsen/logrus" - "mynewt.apache.org/newt/util" . "mynewt.apache.org/newtmgr/nmxact/bledefs" "mynewt.apache.org/newtmgr/nmxact/nmp" @@ -17,7 +15,6 @@ type BlePlainSesn struct { bf *BleFsm nls map[*nmp.NmpListener]struct{} nd *nmp.NmpDispatcher - connTries int closeTimeout time.Duration onCloseCb sesn.BleOnCloseFn @@ -29,7 +26,6 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn { bps := &BlePlainSesn{ nls: map[*nmp.NmpListener]struct{}{}, nd: nmp.NewNmpDispatcher(), - connTries: cfg.Ble.ConnTries, closeTimeout: cfg.Ble.CloseTimeout, onCloseCb: cfg.Ble.OnCloseCb, } @@ -48,6 +44,7 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn { Bx: bx, OwnAddrType: cfg.Ble.OwnAddrType, PeerSpec: cfg.Ble.PeerSpec, + ConnTries: cfg.Ble.ConnTries, SvcUuid: svcUuid, ReqChrUuid: chrUuid, RspChrUuid: chrUuid, @@ -112,46 +109,12 @@ func (bps *BlePlainSesn) listenForClose(timeout time.Duration) error { } } -func (bps *BlePlainSesn) blockUntilClosed(timeout time.Duration) error { - if err := bps.setCloseChan(); err != nil { - return err - } - defer bps.clearCloseChan() - - // If the session is already closed, we're done. - if bps.bf.IsClosed() { - return nil - } - - // Block until close completes or times out. - return bps.listenForClose(timeout) -} - func (bps *BlePlainSesn) AbortRx(seq uint8) error { return bps.nd.FakeRxError(seq, fmt.Errorf("Rx aborted")) } func (bps *BlePlainSesn) Open() error { - var err error - for i := 0; i < bps.connTries; i++ { - log.Debugf("Opening BLE session; try %d/%d", i+1, bps.connTries) - - var retry bool - retry, err = bps.bf.Start() - if !retry { - break - } - - if bps.blockUntilClosed(1*time.Second) != nil { - // Just close the session manually and report the original error. - bps.Close() - return err - } - - log.Debugf("Connection to BLE peer dropped immediately; retrying") - } - - return err + return bps.bf.Start() } func (bps *BlePlainSesn) Close() error { http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_proto.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_proto.go b/nmxact/nmble/ble_proto.go index 69c456f..6a99feb 100644 --- a/nmxact/nmble/ble_proto.go +++ b/nmxact/nmble/ble_proto.go @@ -241,7 +241,8 @@ const ( MSG_TYPE_NOTIFY_RX_EVT = 2055 MSG_TYPE_MTU_CHANGE_EVT = 2056 MSG_TYPE_SCAN_EVT = 2057 - MSG_TYPE_ENC_CHANGE_EVT = 2058 + MSG_TYPE_SCAN_TMO_EVT = 2058 + MSG_TYPE_ENC_CHANGE_EVT = 2059 ) var MsgOpStringMap = map[MsgOp]string{ @@ -276,6 +277,7 @@ var MsgTypeStringMap = map[MsgType]string{ MSG_TYPE_NOTIFY_RX_EVT: "notify_rx_evt", MSG_TYPE_MTU_CHANGE_EVT: "mtu_change_evt", MSG_TYPE_SCAN_EVT: "scan_evt", + MSG_TYPE_SCAN_TMO_EVT: "scan_tmo_evt", MSG_TYPE_ENC_CHANGE_EVT: "enc_change_evt", } @@ -693,6 +695,13 @@ type BleScanEvt struct { DataMfgData BleBytes `json:"data_mfg_data"` } +type BleScanTmoEvt struct { + // Header + Op MsgOp `json:"op"` + Type MsgType `json:"type"` + Seq BleSeq `json:"seq"` +} + type BleScanCancelReq struct { // Header Op MsgOp `json:"op"` http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_util.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_util.go b/nmxact/nmble/ble_util.go index f0a6c04..66c8c6b 100644 --- a/nmxact/nmble/ble_util.go +++ b/nmxact/nmble/ble_util.go @@ -30,7 +30,7 @@ func BhdTimeoutError(rspType MsgType) error { MsgTypeToString(rspType)) log.Debug(str) - return nmxutil.NewXportTimeoutError(str) + return nmxutil.NewXportError(str) } func StatusError(op MsgOp, msgType MsgType, status int) error { http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/ble_xport.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go index f5a9272..d87f0c8 100644 --- a/nmxact/nmble/ble_xport.go +++ b/nmxact/nmble/ble_xport.go @@ -91,8 +91,7 @@ type BleXport struct { shutdownChan chan bool readyChan chan error numReadyListeners int - masterQueue [](chan error) - masterActive bool + master nmxutil.SingleResource randAddr *BleAddr mtx sync.Mutex @@ -104,7 +103,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) { Bd: NewBleDispatcher(), shutdownChan: make(chan bool), readyChan: make(chan error), - masterQueue: [](chan error){}, + master: nmxutil.NewSingleResource(), cfg: cfg, } @@ -216,6 +215,12 @@ func (bx *BleXport) initialSyncCheck() (bool, *BleListener, error) { } func (bx *BleXport) shutdown(restart bool, err error) { + if !nmxutil.IsXport(err) { + panic(fmt.Sprintf( + "BleXport.shutdown() received error that isn't an XportError: %+v", + err)) + } + bx.mtx.Lock() var fullyStarted bool @@ -246,15 +251,12 @@ func (bx *BleXport) shutdown(restart bool, err error) { bx.client.Stop() } + bx.master.Abort(err) + // Indicate an error to all of this transport's listeners. This prevents // them from blocking endlessly while awaiting a BLE message. bx.Bd.ErrorAll(err) - for _, listener := range bx.masterQueue { - listener <- err - } - bx.masterQueue = [](chan error){} - // Stop all of this transport's go routines. for i := 0; i < bx.numStopListeners; i++ { bx.stopChan <- struct{}{} @@ -311,7 +313,7 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool { case BLE_XPORT_STATE_STARTED: bx.notifyReadyListeners(nil) case BLE_XPORT_STATE_STOPPED, BLE_XPORT_STATE_DORMANT: - bx.notifyReadyListeners(fmt.Errorf("BLE transport stopped")) + bx.notifyReadyListeners(nmxutil.NewXportError("BLE transport stopped")) default: } @@ -319,7 +321,7 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool { } func (bx *BleXport) Stop() error { - bx.shutdown(false, nil) + bx.shutdown(false, nmxutil.NewXportError("xport stopped")) return nil } @@ -524,40 +526,9 @@ func (bx *BleXport) RspTimeout() time.Duration { } func (bx *BleXport) AcquireMaster() error { - bx.mtx.Lock() - - if !bx.masterActive { - bx.masterActive = true - bx.mtx.Unlock() - return nil - } - - listener := make(chan error) - bx.masterQueue = append(bx.masterQueue, listener) - - bx.mtx.Unlock() - - return <-listener + return bx.master.Acquire() } func (bx *BleXport) ReleaseMaster() { - bx.mtx.Lock() - - if !bx.masterActive { - bx.mtx.Unlock() - return - } - - if len(bx.masterQueue) == 0 { - bx.masterActive = false - bx.mtx.Unlock() - return - } - - listener := bx.masterQueue[0] - bx.masterQueue = bx.masterQueue[1:] - - bx.mtx.Unlock() - - listener <- nil + bx.master.Release() } http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmble/dispatch.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go index 40ec8f2..83f1f0f 100644 --- a/nmxact/nmble/dispatch.go +++ b/nmxact/nmble/dispatch.go @@ -109,6 +109,7 @@ func discChrEvtCtor() BleMsg { return &BleDiscChrEvt{} } func notifyRxEvtCtor() BleMsg { return &BleNotifyRxEvt{} } func mtuChangeEvtCtor() BleMsg { return &BleMtuChangeEvt{} } func scanEvtCtor() BleMsg { return &BleScanEvt{} } +func scanTmoEvtCtor() BleMsg { return &BleScanTmoEvt{} } var msgCtorMap = map[OpTypePair]msgCtor{ {MSG_OP_RSP, MSG_TYPE_ERR}: errRspCtor, @@ -136,6 +137,7 @@ var msgCtorMap = map[OpTypePair]msgCtor{ {MSG_OP_EVT, MSG_TYPE_NOTIFY_RX_EVT}: notifyRxEvtCtor, {MSG_OP_EVT, MSG_TYPE_MTU_CHANGE_EVT}: mtuChangeEvtCtor, {MSG_OP_EVT, MSG_TYPE_SCAN_EVT}: scanEvtCtor, + {MSG_OP_EVT, MSG_TYPE_SCAN_TMO_EVT}: scanTmoEvtCtor, } func NewBleDispatcher() *BleDispatcher { @@ -256,8 +258,8 @@ func decodeBleMsg(data []byte) (BleMsgBase, BleMsg, error) { cb := msgCtorMap[opTypePair] if cb == nil { return base, nil, fmt.Errorf( - "Unrecognized op+type pair: %s, %s", - MsgOpToString(base.Op), MsgTypeToString(base.Type)) + "Unrecognized op+type pair:") // %s, %s", + //MsgOpToString(base.Op), MsgTypeToString(base.Type)) } msg := cb() http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmserial/serial_xport.go ---------------------------------------------------------------------- diff --git a/nmxact/nmserial/serial_xport.go b/nmxact/nmserial/serial_xport.go index 7dbb5ca..37f991f 100644 --- a/nmxact/nmserial/serial_xport.go +++ b/nmxact/nmserial/serial_xport.go @@ -12,9 +12,9 @@ import ( "github.com/joaojeronimo/go-crc16" "github.com/tarm/serial" + "mynewt.apache.org/newt/util" "mynewt.apache.org/newtmgr/nmxact/nmxutil" "mynewt.apache.org/newtmgr/nmxact/sesn" - "mynewt.apache.org/newt/util" ) type XportCfg struct { @@ -209,7 +209,7 @@ func (sx *SerialXport) Rx() ([]byte, error) { if err == nil { // Scanner hit EOF, so we'll need to create a new one. This only // happens on timeouts. - err = nmxutil.NewXportTimeoutError( + err = nmxutil.NewXportError( "Timeout reading from serial connection") sx.scanner = bufio.NewScanner(sx.port) } http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmxutil/nmxerr.go ---------------------------------------------------------------------- diff --git a/nmxact/nmxutil/nmxerr.go b/nmxact/nmxutil/nmxerr.go index aa9da23..28930e3 100644 --- a/nmxact/nmxutil/nmxerr.go +++ b/nmxact/nmxutil/nmxerr.go @@ -89,38 +89,44 @@ func IsSesnClosed(err error) bool { return ok } -// Represents a low-level transport error. -type XportError struct { +type ScanTmoError struct { Text string } -func NewXportError(text string) *XportError { - return &XportError{text} +func NewScanTmoError(text string) *ScanTmoError { + return &ScanTmoError{ + Text: text, + } } -func (e *XportError) Error() string { +func (e *ScanTmoError) Error() string { return e.Text } -func IsXport(err error) bool { - _, ok := err.(*XportError) +func IsScanTmo(err error) bool { + _, ok := err.(*ScanTmoError) return ok } -type XportTimeoutError struct { +// Represents a low-level transport error. +type XportError struct { Text string } -func NewXportTimeoutError(text string) *XportTimeoutError { - return &XportTimeoutError{text} +func NewXportError(text string) *XportError { + return &XportError{text} } -func (e *XportTimeoutError) Error() string { +func (e *XportError) Error() string { return e.Text } -func IsXportTimeout(err error) bool { - _, ok := err.(*XportTimeoutError) +func IsXport(err error) bool { + if err == nil { + return false + } + + _, ok := err.(*XportError) return ok } http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/nmxutil/nmxutil.go ---------------------------------------------------------------------- diff --git a/nmxact/nmxutil/nmxutil.go b/nmxact/nmxutil/nmxutil.go index ef1ecd4..cb299e9 100644 --- a/nmxact/nmxutil/nmxutil.go +++ b/nmxact/nmxutil/nmxutil.go @@ -23,3 +23,80 @@ func NextNmpSeq() uint8 { return val } + +type SingleResource struct { + acquired bool + waitQueue [](chan error) + mtx sync.Mutex +} + +func NewSingleResource() SingleResource { + return SingleResource{ + waitQueue: [](chan error){}, + } +} + +func (s *SingleResource) removeWaiter(waiter chan error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + for i, w := range s.waitQueue { + if w == waiter { + s.waitQueue = append(s.waitQueue[:i], s.waitQueue[i+1:]...) + } + } +} + +func (s *SingleResource) Acquire() error { + s.mtx.Lock() + + if !s.acquired { + s.acquired = true + s.mtx.Unlock() + return nil + } + + w := make(chan error) + s.waitQueue = append(s.waitQueue, w) + + s.mtx.Unlock() + + err := <-w + if err != nil { + s.removeWaiter(w) + return err + } + + return nil +} + +func (s *SingleResource) Release() { + s.mtx.Lock() + + if !s.acquired { + s.mtx.Unlock() + return + } + + if len(s.waitQueue) == 0 { + s.acquired = false + s.mtx.Unlock() + return + } + + w := s.waitQueue[0] + s.waitQueue = s.waitQueue[1:] + + s.mtx.Unlock() + + w <- nil +} + +func (s *SingleResource) Abort(err error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + for _, w := range s.waitQueue { + w <- err + } +} http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/sesn/sesn.go ---------------------------------------------------------------------- diff --git a/nmxact/sesn/sesn.go b/nmxact/sesn/sesn.go index 0782e63..ea3f43a 100644 --- a/nmxact/sesn/sesn.go +++ b/nmxact/sesn/sesn.go @@ -77,9 +77,7 @@ func TxNmp(s Sesn, m *nmp.NmpMsg, o TxOptions) (nmp.NmpRsp, error) { return r, nil } - if (!nmxutil.IsNmpTimeout(err) && !nmxutil.IsXportTimeout(err)) || - i >= retries { - + if !nmxutil.IsNmpTimeout(err) || i >= retries { return nil, err } } http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/6c9269d7/nmxact/sesn/sesn_cfg.go ---------------------------------------------------------------------- diff --git a/nmxact/sesn/sesn_cfg.go b/nmxact/sesn/sesn_cfg.go index 9c84db1..3c57f24 100644 --- a/nmxact/sesn/sesn_cfg.go +++ b/nmxact/sesn/sesn_cfg.go @@ -61,7 +61,7 @@ func NewSesnCfg() SesnCfg { return SesnCfg{ Ble: SesnCfgBle{ ConnTries: 3, - CloseTimeout: 5 * time.Second, + CloseTimeout: 15 * time.Second, }, } }
