This is an automated email from the ASF dual-hosted git repository.

ccollins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git

commit cf276ea9920ba3429e84648445981402acbd9abd
Author: Christopher Collins <ccoll...@apache.org>
AuthorDate: Wed Jul 12 18:33:41 2017 -0700

    nmxact - Remove some mutexes
---
 nmxact/example/ble_loop/ble_loop.go |  12 ++-
 nmxact/example/ble_scan/ble_scan.go |   7 +-
 nmxact/nmble/ble_act.go             |   4 +-
 nmxact/nmble/ble_fsm.go             |  93 ++++++++++++++---------
 nmxact/nmble/ble_oic_sesn.go        | 145 +++++++++++-------------------------
 nmxact/nmble/ble_plain_sesn.go      | 109 +++++++++++++--------------
 nmxact/nmble/ble_xport.go           | 110 +++++++++++++--------------
 nmxact/nmble/discover.go            |   2 +-
 nmxact/nmble/dispatch.go            |  20 +++--
 nmxact/nmble/receiver.go            |   3 +
 nmxact/nmxutil/bcast.go             |  43 +++++++++++
 nmxact/nmxutil/err_funnel.go        |  55 ++------------
 12 files changed, 284 insertions(+), 319 deletions(-)

diff --git a/nmxact/example/ble_loop/ble_loop.go 
b/nmxact/example/ble_loop/ble_loop.go
index 7a930a9..9e79d29 100644
--- a/nmxact/example/ble_loop/ble_loop.go
+++ b/nmxact/example/ble_loop/ble_loop.go
@@ -26,8 +26,11 @@ import (
        "syscall"
        "time"
 
+       log "github.com/Sirupsen/logrus"
+
        "mynewt.apache.org/newtmgr/nmxact/bledefs"
        "mynewt.apache.org/newtmgr/nmxact/nmble"
+       "mynewt.apache.org/newtmgr/nmxact/nmxutil"
        "mynewt.apache.org/newtmgr/nmxact/sesn"
        "mynewt.apache.org/newtmgr/nmxact/xact"
        "mynewt.apache.org/newtmgr/nmxact/xport"
@@ -58,11 +61,14 @@ func configExitHandler(x xport.Xport, s sesn.Sesn) {
 }
 
 func main() {
+       //nmxutil.SetLogLevel(log.DebugLevel)
+       nmxutil.SetLogLevel(log.InfoLevel)
+
        // Initialize the BLE transport.
        params := nmble.NewXportCfg()
        params.SockPath = "/tmp/blehostd-uds"
-       params.BlehostdPath = "blehostd.elf"
-       params.DevPath = "/dev/cu.usbmodem142111"
+       params.BlehostdPath = "blehostd"
+       params.DevPath = "/dev/cu.usbmodem142121"
 
        x, err := nmble.NewBleXport(params)
        if err != nil {
@@ -83,7 +89,7 @@ func main() {
        //     * Peer has name "nimble-bleprph"
        //     * We use a random address.
        dev, err := nmble.DiscoverDeviceWithName(
-               x, bledefs.BLE_ADDR_TYPE_RANDOM, 10*time.Second, 
"nimble-bleprph")
+               x, bledefs.BLE_ADDR_TYPE_RANDOM, 10*time.Second, "c4")
        if err != nil {
                fmt.Fprintf(os.Stderr, "error discovering device: %s\n", 
err.Error())
                os.Exit(1)
diff --git a/nmxact/example/ble_scan/ble_scan.go 
b/nmxact/example/ble_scan/ble_scan.go
index 73ca24f..3a01146 100644
--- a/nmxact/example/ble_scan/ble_scan.go
+++ b/nmxact/example/ble_scan/ble_scan.go
@@ -70,8 +70,8 @@ func main() {
        // Initialize the BLE transport.
        params := nmble.NewXportCfg()
        params.SockPath = "/tmp/blehostd-uds"
-       params.BlehostdPath = "blehostd.elf"
-       params.DevPath = "/dev/cu.usbmodem14221"
+       params.BlehostdPath = "blehostd"
+       params.DevPath = "/dev/cu.usbmodem142121"
 
        x, err := nmble.NewBleXport(params)
        if err != nil {
@@ -105,9 +105,6 @@ func main() {
 
        for {
                sc := scan.BleOmpScanCfg(scanCb)
-               sc.Ble.ScanPred = func(adv bledefs.BleAdvReport) bool {
-                       return adv.Fields.Name != nil && *adv.Fields.Name == 
"c5"
-               }
                if err := scanner.Start(sc); err != nil {
                        fmt.Fprintf(os.Stderr, "error starting scan: %s\n", 
err.Error())
                        os.Exit(1)
diff --git a/nmxact/nmble/ble_act.go b/nmxact/nmble/ble_act.go
index 2c4b71d..e51b8e9 100644
--- a/nmxact/nmble/ble_act.go
+++ b/nmxact/nmble/ble_act.go
@@ -324,8 +324,8 @@ func exchangeMtu(x *BleXport, bl *Listener, r 
*BleExchangeMtuReq) (
        }
 }
 
-func actScan(x *BleXport, bl *Listener, r *BleScanReq,
-       abortChan chan struct{}, advRptCb BleAdvRptFn) error {
+func actScan(x *BleXport, bl *Listener, r *BleScanReq, abortChan chan struct{},
+       advRptCb BleAdvRptFn) error {
 
        const rspType = MSG_TYPE_SCAN
 
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index 52b163e..44a7908 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -48,8 +48,11 @@ const (
        FSM_DISCONNECT_TYPE_REQUESTED
 )
 
-type BleRxDataFn func(data []byte)
-type BleDisconnectFn func(dt BleFsmDisconnectType, peer BleDev, err error)
+type BleDisconnectEntry struct {
+       Dt   BleFsmDisconnectType
+       Peer BleDev
+       Err  error
+}
 
 type BleFsmParamsCentral struct {
        PeerDev     BleDev
@@ -58,15 +61,13 @@ type BleFsmParamsCentral struct {
 }
 
 type BleFsmParams struct {
-       Bx           *BleXport
-       OwnAddrType  BleAddrType
-       EncryptWhen  BleEncryptWhen
-       Central      BleFsmParamsCentral
-       SvcUuids     []BleUuid
-       ReqChrUuid   BleUuid
-       RspChrUuid   BleUuid
-       RxDataCb     BleRxDataFn
-       DisconnectCb BleDisconnectFn
+       Bx          *BleXport
+       OwnAddrType BleAddrType
+       EncryptWhen BleEncryptWhen
+       Central     BleFsmParamsCentral
+       SvcUuids    []BleUuid
+       ReqChrUuid  BleUuid
+       RspChrUuid  BleUuid
 }
 
 type BleFsm struct {
@@ -83,8 +84,9 @@ type BleFsm struct {
        errFunnel  nmxutil.ErrFunnel
        id         uint32
 
-       // Conveys changes in encrypted state.
-       encChan chan error
+       encBcast       nmxutil.Bcaster
+       disconnectChan chan BleDisconnectEntry
+       rxNmpChan      chan []byte
 }
 
 func NewBleFsm(p BleFsmParams) *BleFsm {
@@ -99,7 +101,6 @@ func NewBleFsm(p BleFsmParams) *BleFsm {
 
        bf.errFunnel.AccumDelay = 250 * time.Millisecond
        bf.errFunnel.LessCb = fsmErrorLess
-       bf.errFunnel.ProcCb = func(err error) { bf.processErr(err) }
 
        return bf
 }
@@ -163,7 +164,14 @@ func calcDisconnectType(state BleSesnState) 
BleFsmDisconnectType {
        }
 }
 
-func (bf *BleFsm) processErr(err error) {
+// Listens for an error in the state machine.  On error, the session is
+// considered disconnected and the error is reported to the client.
+func (bf *BleFsm) listenForError() {
+       err := <-bf.errFunnel.Wait()
+
+       // Stop listening for NMP responses.
+       close(bf.rxNmpChan)
+
        // Remember some fields before we clear them.
        dt := calcDisconnectType(bf.state)
 
@@ -175,8 +183,10 @@ func (bf *BleFsm) processErr(err error) {
        // Wait for all listeners to get removed.
        bf.rxer.WaitUntilNoListeners()
 
-       bf.errFunnel.Reset()
-       bf.params.DisconnectCb(dt, bf.peerDev, err)
+       bf.disconnectChan <- BleDisconnectEntry{dt, bf.peerDev, err}
+       close(bf.disconnectChan)
+
+       bf.disconnectChan = make(chan BleDisconnectEntry)
 }
 
 // Listens for events in the background.
@@ -214,9 +224,9 @@ func (bf *BleFsm) eventListen(bl *Listener, seq BleSeq) 
error {
                                                log.Debugf("Connection 
encrypted; conn_handle=%d",
                                                        msg.ConnHandle)
                                        }
-                                       if bf.encChan != nil {
-                                               bf.encChan <- err
-                                       }
+
+                                       // Notify any listeners of the 
encryption change event.
+                                       bf.encBcast.SendAndClear(err)
 
                                case *BleDisconnectEvt:
                                        err := bf.disconnectError(msg.Reason)
@@ -259,7 +269,7 @@ func (bf *BleFsm) nmpRspListen() error {
                                        if bf.nmpRspChr != nil &&
                                                msg.AttrHandle == 
bf.nmpRspChr.ValHandle {
 
-                                               
bf.params.RxDataCb(msg.Data.Bytes)
+                                               bf.rxNmpChan <- msg.Data.Bytes
                                        }
                                }
                        }
@@ -437,16 +447,14 @@ func (bf *BleFsm) encInitiate() error {
        }
        defer bf.rxer.RemoveSeqListener("enc-initiate", r.Seq)
 
-       bf.encChan = make(chan error)
-       defer func() { bf.encChan = nil }()
-
        // Initiate the encryption procedure.
        if err := encInitiate(bf.params.Bx, bl, r); err != nil {
                return err
        }
 
        // Block until the procedure completes.
-       return <-bf.encChan
+       itf := <-bf.encBcast.Listen()
+       return itf.(error)
 }
 
 func (bf *BleFsm) discAllChrs() error {
@@ -621,6 +629,14 @@ func (bf *BleFsm) executeState() (bool, error) {
        return false, nil
 }
 
+func (bf *BleFsm) DisconnectChan() <-chan BleDisconnectEntry {
+       return bf.disconnectChan
+}
+
+func (bf *BleFsm) RxNmpChan() <-chan []byte {
+       return bf.rxNmpChan
+}
+
 func (bf *BleFsm) startOnce() (bool, error) {
        if !bf.IsClosed() {
                return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf(
@@ -628,15 +644,16 @@ func (bf *BleFsm) startOnce() (bool, error) {
                        bf.state))
        }
 
-       bf.errFunnel.Start()
-
        for {
                retry, err := bf.executeState()
                if err != nil {
                        bf.errFunnel.Insert(err)
-                       err = bf.errFunnel.Wait()
+                       err = <-bf.errFunnel.Wait()
                        return retry, err
                } else if bf.state == SESN_STATE_DONE {
+                       // We are fully connected.  Listen for errors in the 
background.
+                       go bf.listenForError()
+
                        return false, nil
                }
        }
@@ -648,6 +665,9 @@ func (bf *BleFsm) startOnce() (bool, error) {
 func (bf *BleFsm) Start() error {
        var err error
 
+       bf.disconnectChan = make(chan BleDisconnectEntry)
+       bf.rxNmpChan = make(chan []byte)
+
        for i := 0; i < bf.params.Central.ConnTries; i++ {
                var retry bool
                retry, err = bf.startOnce()
@@ -656,12 +676,16 @@ func (bf *BleFsm) Start() error {
                }
        }
 
-       return err
+       if err != nil {
+               return err
+       }
+
+       return nil
 }
 
 // @return bool                 true if stop complete;
 //                              false if disconnect is now pending.
-func (bf *BleFsm) Stop() (bool, error) {
+func (bf *BleFsm) Stop() error {
        state := bf.state
 
        switch state {
@@ -669,19 +693,18 @@ func (bf *BleFsm) Stop() (bool, error) {
                SESN_STATE_TERMINATING,
                SESN_STATE_CONN_CANCELLING:
 
-               return false,
-                       bf.closedError("Attempt to close an unopened BLE 
session")
+               return bf.closedError("Attempt to close an unopened BLE 
session")
 
        case SESN_STATE_CONNECTING:
                bf.connCancel()
                bf.errFunnel.Insert(fmt.Errorf("Connection attempt cancelled"))
-               return false, nil
+               return nil
 
        default:
                if err := bf.terminate(); err != nil {
-                       return false, err
+                       return err
                }
-               return false, nil
+               return nil
        }
 }
 
diff --git a/nmxact/nmble/ble_oic_sesn.go b/nmxact/nmble/ble_oic_sesn.go
index 9db1c84..e1bf28b 100644
--- a/nmxact/nmble/ble_oic_sesn.go
+++ b/nmxact/nmble/ble_oic_sesn.go
@@ -2,7 +2,6 @@ package nmble
 
 import (
        "fmt"
-       "sync"
        "time"
 
        "github.com/runtimeco/go-coap"
@@ -22,8 +21,7 @@ type BleOicSesn struct {
        closeTimeout time.Duration
        onCloseCb    sesn.OnCloseFn
 
-       closeChan chan error
-       mtx       sync.Mutex
+       closeChan chan struct{}
 }
 
 func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) *BleOicSesn {
@@ -52,136 +50,81 @@ func NewBleOicSesn(bx *BleXport, cfg sesn.SesnCfg) 
*BleOicSesn {
                ReqChrUuid:  reqChrUuid,
                RspChrUuid:  rspChrUuid,
                EncryptWhen: cfg.Ble.EncryptWhen,
-               RxDataCb:    func(d []byte) { bos.onRxNmp(d) },
-               DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
-                       bos.onDisconnect(dt, p, e)
-               },
        })
 
        return bos
 }
 
-// Returns true if a new channel was assigned.
-func (bos *BleOicSesn) setCloseChan() error {
-       bos.mtx.Lock()
-       defer bos.mtx.Unlock()
-
-       if bos.closeChan != nil {
-               return fmt.Errorf("Multiple listeners waiting for session to 
close")
-       }
-
-       bos.closeChan = make(chan error, 1)
-       return nil
-}
-
-func (bos *BleOicSesn) clearCloseChan() {
-       bos.mtx.Lock()
-       defer bos.mtx.Unlock()
-
-       bos.closeChan = nil
+func (bos *BleOicSesn) AbortRx(seq uint8) error {
+       return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted"))
 }
 
-func (bos *BleOicSesn) listenForClose(timeout time.Duration) error {
-       select {
-       case <-bos.closeChan:
-               return nil
-       case <-time.After(timeout):
-               // Session never closed.
-               return fmt.Errorf("Timeout while waiting for session to close")
-       }
-}
+func (bos *BleOicSesn) Open() error {
+       // This channel gets closed when the session closes.
+       bos.closeChan = make(chan struct{})
 
-func (bos *BleOicSesn) blockUntilClosed(timeout time.Duration) error {
-       if err := bos.setCloseChan(); err != nil {
+       if err := bos.bf.Start(); err != nil {
+               close(bos.closeChan)
                return err
        }
-       defer bos.clearCloseChan()
 
-       // If the session is already closed, we're done.
-       if bos.bf.IsClosed() {
-               return nil
-       }
-
-       // Block until close completes or times out.
-       return bos.listenForClose(timeout)
-}
-
-func (bos *BleOicSesn) AbortRx(seq uint8) error {
-       return bos.d.ErrorOneNmp(seq, fmt.Errorf("Rx aborted"))
-}
-
-func (bos *BleOicSesn) Open() error {
        d, err := omp.NewDispatcher(true, 3)
        if err != nil {
+               close(bos.closeChan)
                return err
        }
        bos.d = d
 
-       if err := bos.bf.Start(); err != nil {
+       // Listen for disconnect in the background.
+       go func() {
+               // Block until disconnect.
+               entry := <-bos.bf.DisconnectChan()
+
+               // Signal error to all listeners.
+               bos.d.ErrorAll(entry.Err)
                bos.d.Stop()
-               return err
-       }
+
+               // If the session is being closed, unblock the close() call.
+               close(bos.closeChan)
+
+               // Only execute the client's disconnect callback if the 
disconnect was
+               // unsolicited.
+               if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bos.onCloseCb 
!= nil {
+                       bos.onCloseCb(bos, entry.Err)
+               }
+       }()
+
+       // Listen for NMP responses in the background.
+       go func() {
+               for {
+                       data, ok := <-bos.bf.RxNmpChan()
+                       if !ok {
+                               // Disconnected.
+                               return
+                       } else {
+                               bos.d.Dispatch(data)
+                       }
+               }
+       }()
+
        return nil
 }
 
 func (bos *BleOicSesn) Close() error {
-       if err := bos.setCloseChan(); err != nil {
-               return err
-       }
-       defer bos.clearCloseChan()
-
-       done, err := bos.bf.Stop()
+       err := bos.bf.Stop()
        if err != nil {
                return err
        }
 
-       if done {
-               // Close complete.
-               return nil
-       }
-
-       // Block until close completes or times out.
-       return bos.listenForClose(bos.closeTimeout)
+       // Block until close completes.
+       <-bos.closeChan
+       return nil
 }
 
 func (bos *BleOicSesn) IsOpen() bool {
        return bos.bf.IsOpen()
 }
 
-func (bos *BleOicSesn) onRxNmp(data []byte) {
-       bos.d.Dispatch(data)
-}
-
-// Called by the FSM when a blehostd disconnect event is received.
-func (bos *BleOicSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
-       err error) {
-
-       bos.d.ErrorAll(err)
-
-       bos.mtx.Lock()
-
-       // If the session is being closed, unblock the close() call.
-       if bos.closeChan != nil {
-               bos.closeChan <- err
-       }
-
-       bos.mtx.Unlock()
-
-       // Only stop the dispatcher and execute client's disconnect callback if 
the
-       // disconnect was unsolicited and the session was fully open.  If the
-       // session wasn't fully open, the dispatcher will get stopped when the 
fsm
-       // start function returns an error (right after this function returns).
-       if dt == FSM_DISCONNECT_TYPE_OPENED || dt == 
FSM_DISCONNECT_TYPE_REQUESTED {
-               bos.d.Stop()
-       }
-
-       if dt == FSM_DISCONNECT_TYPE_OPENED {
-               if bos.onCloseCb != nil {
-                       bos.onCloseCb(bos, err)
-               }
-       }
-}
-
 func (bos *BleOicSesn) EncodeNmpMsg(m *nmp.NmpMsg) ([]byte, error) {
        return omp.EncodeOmpTcp(m)
 }
diff --git a/nmxact/nmble/ble_plain_sesn.go b/nmxact/nmble/ble_plain_sesn.go
index b69d4f7..782fce0 100644
--- a/nmxact/nmble/ble_plain_sesn.go
+++ b/nmxact/nmble/ble_plain_sesn.go
@@ -16,25 +16,17 @@ type BlePlainSesn struct {
        closeTimeout time.Duration
        onCloseCb    sesn.OnCloseFn
 
-       closeChan chan error
+       closeChan chan struct{}
 }
 
 func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) *BlePlainSesn {
        bps := &BlePlainSesn{
-               d:            nmp.NewDispatcher(1),
                closeTimeout: cfg.Ble.CloseTimeout,
                onCloseCb:    cfg.OnCloseCb,
        }
 
-       svcUuid, err := ParseUuid(NmpPlainSvcUuid)
-       if err != nil {
-               panic(err.Error())
-       }
-
-       chrUuid, err := ParseUuid(NmpPlainChrUuid)
-       if err != nil {
-               panic(err.Error())
-       }
+       svcUuid, _ := ParseUuid(NmpPlainSvcUuid)
+       chrUuid, _ := ParseUuid(NmpPlainChrUuid)
 
        bps.bf = NewBleFsm(BleFsmParams{
                Bx:          bx,
@@ -48,84 +40,83 @@ func NewBlePlainSesn(bx *BleXport, cfg sesn.SesnCfg) 
*BlePlainSesn {
                ReqChrUuid:  chrUuid,
                RspChrUuid:  chrUuid,
                EncryptWhen: cfg.Ble.EncryptWhen,
-               RxDataCb:    func(d []byte) { bps.onRxNmp(d) },
-               DisconnectCb: func(dt BleFsmDisconnectType, p BleDev, e error) {
-                       bps.onDisconnect(dt, p, e)
-               },
        })
 
        return bps
 }
 
-func (bps *BlePlainSesn) setCloseChan() error {
-       if bps.closeChan != nil {
-               return fmt.Errorf("Multiple listeners waiting for session to 
close")
-       }
-
-       bps.closeChan = make(chan error, 1)
-       return nil
-}
-
-func (bps *BlePlainSesn) clearCloseChan() {
-       bps.closeChan = nil
-}
-
-func (bps *BlePlainSesn) listenForClose(timeout time.Duration) error {
-       select {
-       case <-bps.closeChan:
-               return nil
-       case <-time.After(timeout):
-               // Session never closed.
-               return fmt.Errorf("Timeout while waiting for session to close")
-       }
-}
-
 func (bps *BlePlainSesn) AbortRx(seq uint8) error {
        return bps.d.ErrorOne(seq, fmt.Errorf("Rx aborted"))
 }
 
 func (bps *BlePlainSesn) Open() error {
-       return bps.bf.Start()
-}
+       // This channel gets closed when the session closes.
+       bps.closeChan = make(chan struct{})
 
-func (bps *BlePlainSesn) Close() error {
-       if err := bps.setCloseChan(); err != nil {
+       if err := bps.bf.Start(); err != nil {
+               close(bps.closeChan)
                return err
        }
-       defer bps.clearCloseChan()
 
-       done, err := bps.bf.Stop()
+       bps.d = nmp.NewDispatcher(3)
+
+       // Listen for disconnect in the background.
+       go func() {
+               // Block until disconnect.
+               entry := <-bps.bf.DisconnectChan()
+
+               // Signal error to all listeners.
+               bps.d.ErrorAll(entry.Err)
+
+               // If the session is being closed, unblock the close() call.
+               close(bps.closeChan)
+
+               // Only execute the client's disconnect callback if the 
disconnect was
+               // unsolicited.
+               if entry.Dt != FSM_DISCONNECT_TYPE_REQUESTED && bps.onCloseCb 
!= nil {
+                       bps.onCloseCb(bps, entry.Err)
+               }
+       }()
+
+       // Listen for NMP responses in the background.
+       go func() {
+               for {
+                       data, ok := <-bps.bf.RxNmpChan()
+                       if !ok {
+                               // Disconnected.
+                               return
+                       } else {
+                               bps.d.Dispatch(data)
+                       }
+               }
+       }()
+
+       return nil
+}
+
+func (bps *BlePlainSesn) Close() error {
+       err := bps.bf.Stop()
        if err != nil {
                return err
        }
 
-       if done {
-               // Close complete.
-               return nil
-       }
-
-       // Block until close completes or times out.
-       return bps.listenForClose(bps.closeTimeout)
+       // Block until close completes.
+       <-bps.closeChan
+       return nil
 }
 
 func (bps *BlePlainSesn) IsOpen() bool {
        return bps.bf.IsOpen()
 }
 
-func (bps *BlePlainSesn) onRxNmp(data []byte) {
-       bps.d.Dispatch(data)
-}
-
 // Called by the FSM when a blehostd disconnect event is received.
 func (bps *BlePlainSesn) onDisconnect(dt BleFsmDisconnectType, peer BleDev,
        err error) {
 
        bps.d.ErrorAll(err)
 
-       // If someone is waiting for the session to close, unblock them.
-       if bps.closeChan != nil {
-               bps.closeChan <- err
-       }
+       // If the session is being closed, unblock the close() call.
+       close(bps.closeChan)
 
        // Only execute client's disconnect callback if the disconnect was
        // unsolicited and the session was fully open.
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index db0a8e3..67b2d60 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -84,19 +84,17 @@ const (
 
 // Implements xport.Xport.
 type BleXport struct {
-       Bd                *Dispatcher
-       client            *unixchild.Client
-       state             BleXportState
-       stopChan          chan struct{}
-       numStopListeners  int
-       shutdownChan      chan bool
-       readyChan         chan error
-       numReadyListeners int
-       master            nmxutil.SingleResource
-       slave             nmxutil.SingleResource
-       randAddr          *BleAddr
-       mtx               sync.Mutex
-       scanner           *BleScanner
+       Bd           *Dispatcher
+       client       *unixchild.Client
+       state        BleXportState
+       stopChan     chan struct{}
+       shutdownChan chan bool
+       readyBcast   nmxutil.Bcaster
+       master       nmxutil.SingleResource
+       slave        nmxutil.SingleResource
+       randAddr     *BleAddr
+       stateMtx     sync.Mutex
+       scanner      *BleScanner
 
        cfg XportCfg
 }
@@ -105,7 +103,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) {
        bx := &BleXport{
                Bd:           NewDispatcher(),
                shutdownChan: make(chan bool),
-               readyChan:    make(chan error),
+               readyBcast:   nmxutil.Bcaster{},
                master:       nmxutil.NewSingleResource(),
                slave:        nmxutil.NewSingleResource(),
                cfg:          cfg,
@@ -114,7 +112,7 @@ func NewBleXport(cfg XportCfg) (*BleXport, error) {
        return bx, nil
 }
 
-func (bx *BleXport) createUnixChild() {
+func (bx *BleXport) startUnixChild() error {
        config := unixchild.Config{
                SockPath:      bx.cfg.SockPath,
                ChildPath:     bx.cfg.BlehostdPath,
@@ -125,6 +123,20 @@ func (bx *BleXport) createUnixChild() {
        }
 
        bx.client = unixchild.New(config)
+
+       if err := bx.client.Start(); err != nil {
+               if unixchild.IsUcAcceptError(err) {
+                       err = nmxutil.NewXportError(
+                               "blehostd did not connect to socket; " +
+                                       "controller not attached?")
+               } else {
+                       err = nmxutil.NewXportError(
+                               "Failed to start child process: " + err.Error())
+               }
+               return err
+       }
+
+       return nil
 }
 
 func (bx *BleXport) BuildScanner() (scan.Scanner, error) {
@@ -240,7 +252,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 
        log.Debugf("Shutting down BLE transport")
 
-       bx.mtx.Lock()
+       bx.stateMtx.Lock()
 
        var fullyStarted bool
        var already bool
@@ -260,7 +272,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
                bx.state = BLE_XPORT_STATE_STOPPING
        }
 
-       bx.mtx.Unlock()
+       bx.stateMtx.Unlock()
 
        if already {
                // Shutdown already in progress.
@@ -283,10 +295,7 @@ func (bx *BleXport) shutdown(restart bool, err error) {
        }
 
        // Stop all of this transport's go routines.
-       log.Debugf("Waiting for BLE transport goroutines to complete")
-       for i := 0; i < bx.numStopListeners; i++ {
-               bx.stopChan <- struct{}{}
-       }
+       close(bx.stopChan)
 
        // Stop the unixchild instance (blehostd + socket).
        if bx.client != nil {
@@ -304,37 +313,32 @@ func (bx *BleXport) shutdown(restart bool, err error) {
 }
 
 func (bx *BleXport) blockUntilReady() error {
-       bx.mtx.Lock()
+       var ch chan interface{}
+
+       bx.stateMtx.Lock()
        switch bx.state {
        case BLE_XPORT_STATE_STARTED:
                // Already started; don't block.
-               bx.mtx.Unlock()
+               bx.stateMtx.Unlock()
                return nil
 
        case BLE_XPORT_STATE_DORMANT:
                // Not in the process of starting; the user will be waiting 
forever.
-               bx.mtx.Unlock()
+               bx.stateMtx.Unlock()
                return fmt.Errorf("Attempt to use BLE transport without 
starting it")
 
        default:
+               ch = bx.readyBcast.Listen()
        }
+       bx.stateMtx.Unlock()
 
-       bx.numReadyListeners++
-       bx.mtx.Unlock()
-
-       return <-bx.readyChan
-}
-
-func (bx *BleXport) notifyReadyListeners(err error) {
-       for i := 0; i < bx.numReadyListeners; i++ {
-               bx.readyChan <- err
-       }
-       bx.numReadyListeners = 0
+       itf := <-ch
+       return itf.(error)
 }
 
 func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
-       bx.mtx.Lock()
-       defer bx.mtx.Unlock()
+       bx.stateMtx.Lock()
+       defer bx.stateMtx.Unlock()
 
        if bx.state != from {
                return false
@@ -343,9 +347,10 @@ func (bx *BleXport) setStateFrom(from BleXportState, to 
BleXportState) bool {
        bx.state = to
        switch bx.state {
        case BLE_XPORT_STATE_STARTED:
-               bx.notifyReadyListeners(nil)
+               bx.readyBcast.SendAndClear(nil)
        case BLE_XPORT_STATE_STOPPED, BLE_XPORT_STATE_DORMANT:
-               bx.notifyReadyListeners(nmxutil.NewXportError("BLE transport 
stopped"))
+               bx.readyBcast.SendAndClear(
+                       nmxutil.NewXportError("BLE transport stopped"))
        default:
        }
 
@@ -362,32 +367,21 @@ func (bx *BleXport) startOnce() error {
                return nmxutil.NewXportError("BLE xport started twice")
        }
 
-       bx.stopChan = make(chan struct{})
-       bx.numStopListeners = 0
-
-       bx.createUnixChild()
-       if err := bx.client.Start(); err != nil {
-               if unixchild.IsUcAcceptError(err) {
-                       err = nmxutil.NewXportError(
-                               "blehostd did not connect to socket; " +
-                                       "controller not attached?")
-               } else {
-                       err = nmxutil.NewXportError(
-                               "Failed to start child process: " + err.Error())
-               }
+       if err := bx.startUnixChild(); err != nil {
                bx.shutdown(true, err)
                return err
        }
 
+       bx.stopChan = make(chan struct{})
+
        // Listen for errors and data from the blehostd process.
        go func() {
-               bx.numStopListeners++
                for {
                        select {
                        case err := <-bx.client.ErrChild:
                                err = nmxutil.NewXportError("BLE transport 
error: " +
                                        err.Error())
-                               go bx.shutdown(true, err)
+                               bx.shutdown(true, err)
 
                        case buf := <-bx.client.FromChild:
                                if len(buf) != 0 {
@@ -433,16 +427,16 @@ func (bx *BleXport) startOnce() error {
 
        // Host and controller are synced.  Listen for sync loss in the 
background.
        go func() {
-               bx.numStopListeners++
                for {
                        select {
                        case err := <-bl.ErrChan:
-                               go bx.shutdown(true, err)
+                               bx.shutdown(true, err)
+                               return
                        case bm := <-bl.MsgChan:
                                switch msg := bm.(type) {
                                case *BleSyncEvt:
                                        if !msg.Synced {
-                                               go bx.shutdown(true, 
nmxutil.NewXportError(
+                                               bx.shutdown(true, 
nmxutil.NewXportError(
                                                        "BLE host <-> 
controller sync lost"))
                                        }
                                }
@@ -452,7 +446,7 @@ func (bx *BleXport) startOnce() error {
                }
        }()
 
-       // Generate a new random address is none was specified.
+       // Generate a new random address if none was specified.
        if bx.randAddr == nil {
                addr, err := GenRandAddrXact(bx)
                if err != nil {
diff --git a/nmxact/nmble/discover.go b/nmxact/nmble/discover.go
index 3858e75..2ac19ba 100644
--- a/nmxact/nmble/discover.go
+++ b/nmxact/nmble/discover.go
@@ -108,7 +108,7 @@ func (d *Discoverer) Stop() error {
                return nmxutil.NewAlreadyError("Attempt to stop inactive 
discoverer")
        }
 
-       ch <- struct{}{}
+       close(ch)
        return nil
 }
 
diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go
index ba59f81..435b123 100644
--- a/nmxact/nmble/dispatch.go
+++ b/nmxact/nmble/dispatch.go
@@ -55,7 +55,7 @@ type Listener struct {
 func NewListener() *Listener {
        return &Listener{
                MsgChan: make(chan Msg, 16),
-               ErrChan: make(chan error, 4),
+               ErrChan: make(chan error, 1),
                TmoChan: make(chan time.Time, 1),
        }
 }
@@ -70,10 +70,14 @@ func (bl *Listener) AfterTimeout(tmo time.Duration) <-chan 
time.Time {
        return bl.TmoChan
 }
 
-func (bl *Listener) Stop() {
+func (bl *Listener) Close() {
        if bl.timer != nil {
                bl.timer.Stop()
        }
+
+       close(bl.MsgChan)
+       close(bl.ErrChan)
+       close(bl.TmoChan)
 }
 
 type Dispatcher struct {
@@ -234,7 +238,7 @@ func (d *Dispatcher) RemoveListener(base MsgBase) *Listener 
{
 
        base, bl := d.findListener(base)
        if bl != nil {
-               bl.Stop()
+               bl.Close()
                if base.Seq != BLE_SEQ_NONE {
                        delete(d.seqMap, base.Seq)
                } else {
@@ -264,8 +268,8 @@ func decodeMsg(data []byte) (MsgBase, Msg, error) {
        cb := msgCtorMap[opTypePair]
        if cb == nil {
                return base, nil, fmt.Errorf(
-                       "Unrecognized op+type pair:") // %s, %s",
-               //MsgOpToString(base.Op), MsgTypeToString(base.Type))
+                       "Unrecognized op+type pair: %s, %s",
+                       MsgOpToString(base.Op), MsgTypeToString(base.Type))
        }
 
        msg := cb()
@@ -298,6 +302,10 @@ func (d *Dispatcher) Dispatch(data []byte) {
 }
 
 func (d *Dispatcher) ErrorAll(err error) {
+       if err == nil {
+               panic("NIL ERROR")
+       }
+
        d.mtx.Lock()
 
        m1 := d.seqMap
@@ -310,8 +318,10 @@ func (d *Dispatcher) ErrorAll(err error) {
 
        for _, bl := range m1 {
                bl.ErrChan <- err
+               bl.Close()
        }
        for _, bl := range m2 {
                bl.ErrChan <- err
+               bl.Close()
        }
 }
diff --git a/nmxact/nmble/receiver.go b/nmxact/nmble/receiver.go
index d089ba8..8660393 100644
--- a/nmxact/nmble/receiver.go
+++ b/nmxact/nmble/receiver.go
@@ -94,6 +94,9 @@ func (r *Receiver) RemoveSeqListener(name string, seq BleSeq) 
{
 }
 
 func (r *Receiver) ErrorAll(err error) {
+       if err == nil {
+               panic("NIL ERROR")
+       }
        r.mtx.Lock()
        defer r.mtx.Unlock()
 
diff --git a/nmxact/nmxutil/bcast.go b/nmxact/nmxutil/bcast.go
new file mode 100644
index 0000000..a7d68c0
--- /dev/null
+++ b/nmxact/nmxutil/bcast.go
@@ -0,0 +1,43 @@
+package nmxutil
+
+import (
+       "sync"
+)
+
+type Bcaster struct {
+       chs [](chan interface{})
+       mtx sync.Mutex
+}
+
+func (b *Bcaster) Listen() chan interface{} {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+
+       ch := make(chan interface{})
+       b.chs = append(b.chs, ch)
+
+       return ch
+}
+
+func (b *Bcaster) Send(val interface{}) {
+       b.mtx.Lock()
+       chs := b.chs
+       b.mtx.Unlock()
+
+       for _, ch := range chs {
+               ch <- val
+               close(ch)
+       }
+}
+
+func (b *Bcaster) Clear() {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+
+       b.chs = nil
+}
+
+func (b *Bcaster) SendAndClear(val interface{}) {
+       b.Send(val)
+       b.Clear()
+}
diff --git a/nmxact/nmxutil/err_funnel.go b/nmxact/nmxutil/err_funnel.go
index 58b0ca2..6d6b5a3 100644
--- a/nmxact/nmxutil/err_funnel.go
+++ b/nmxact/nmxutil/err_funnel.go
@@ -1,7 +1,6 @@
 package nmxutil
 
 import (
-       "fmt"
        "sync"
        "time"
 )
@@ -13,26 +12,15 @@ type ErrProcFn func(err error)
 // reported.
 type ErrFunnel struct {
        LessCb     ErrLessFn
-       ProcCb     ErrProcFn
        AccumDelay time.Duration
 
        mtx      sync.Mutex
        resetMtx sync.Mutex
        curErr   error
        errTimer *time.Timer
-       started  bool
        waiters  [](chan error)
 }
 
-func (f *ErrFunnel) Start() {
-       f.resetMtx.Lock()
-
-       f.mtx.Lock()
-       defer f.mtx.Unlock()
-
-       f.started = true
-}
-
 func (f *ErrFunnel) Insert(err error) {
        if err == nil {
                panic("ErrFunnel nil insert")
@@ -41,10 +29,6 @@ func (f *ErrFunnel) Insert(err error) {
        f.mtx.Lock()
        defer f.mtx.Unlock()
 
-       if !f.started {
-               panic("ErrFunnel insert without start")
-       }
-
        if f.curErr == nil {
                f.curErr = err
                f.errTimer = time.AfterFunc(f.AccumDelay, func() {
@@ -61,18 +45,6 @@ func (f *ErrFunnel) Insert(err error) {
        }
 }
 
-func (f *ErrFunnel) Reset() {
-       f.mtx.Lock()
-       defer f.mtx.Unlock()
-
-       if f.started {
-               f.started = false
-               f.curErr = nil
-               f.errTimer.Stop()
-               f.resetMtx.Unlock()
-       }
-}
-
 func (f *ErrFunnel) timerExp() {
        f.mtx.Lock()
 
@@ -88,35 +60,18 @@ func (f *ErrFunnel) timerExp() {
                panic("ErrFunnel timer expired but no error")
        }
 
-       f.ProcCb(err)
-
        for _, w := range waiters {
                w <- err
+               close(w)
        }
 }
 
-func (f *ErrFunnel) Wait() error {
-       var err error
-       var c chan error
+func (f *ErrFunnel) Wait() chan error {
+       c := make(chan error)
 
        f.mtx.Lock()
-
-       if !f.started {
-               if f.curErr == nil {
-                       err = fmt.Errorf("Wait on unstarted ErrFunnel")
-               } else {
-                       err = f.curErr
-               }
-       } else {
-               c = make(chan error)
-               f.waiters = append(f.waiters, c)
-       }
-
+       f.waiters = append(f.waiters, c)
        f.mtx.Unlock()
 
-       if err != nil {
-               return err
-       } else {
-               return <-c
-       }
+       return c
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@mynewt.apache.org" <commits@mynewt.apache.org>.

Reply via email to