nmxact - Automatically restart BLE xport.

More work needs to be done here.  If a client attempts to use the
transport while it is restarting, the attempt immediately fails.

It would be better if:
    * The transmit attempt blocks until the transport is done
      restarting.
    * The transport gives up after X consecutive failures.


Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/2aeb4daa
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/2aeb4daa
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/2aeb4daa

Branch: refs/heads/master
Commit: 2aeb4daab6e2a13152314c23cd1c79fed5ebbdd2
Parents: db639f9
Author: Christopher Collins <[email protected]>
Authored: Mon Apr 3 16:38:03 2017 -0700
Committer: Christopher Collins <[email protected]>
Committed: Mon Apr 3 16:49:58 2017 -0700

----------------------------------------------------------------------
 nmxact/nmble/ble_xport.go | 167 +++++++++++++++++++++++++++++++----------
 nmxact/nmble/dispatch.go  |  15 ++++
 2 files changed, 144 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/2aeb4daa/nmxact/nmble/ble_xport.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_xport.go b/nmxact/nmble/ble_xport.go
index 16a4128..32ced04 100644
--- a/nmxact/nmble/ble_xport.go
+++ b/nmxact/nmble/ble_xport.go
@@ -53,21 +53,26 @@ const (
        BLE_XPORT_STATE_STOPPED BleXportState = iota
        BLE_XPORT_STATE_STARTING
        BLE_XPORT_STATE_STARTED
+       BLE_XPORT_STATE_STOPPING
 )
 
 // Implements xport.Xport.
 type BleXport struct {
-       Bd     *BleDispatcher
-       client *unixchild.Client
-       state  BleXportState
+       Bd               *BleDispatcher
+       client           *unixchild.Client
+       state            BleXportState
+       stopChan         chan struct{}
+       shutdownChan     chan bool
+       numStopListeners int
 
        cfg XportCfg
 }
 
 func NewBleXport(cfg XportCfg) (*BleXport, error) {
        bx := &BleXport{
-               Bd:  NewBleDispatcher(),
-               cfg: cfg,
+               Bd:           NewBleDispatcher(),
+               shutdownChan: make(chan bool),
+               cfg:          cfg,
        }
 
        return bx, nil
@@ -177,18 +182,46 @@ func (bx *BleXport) initialSyncCheck() (bool, 
*BleListener, error) {
        return synced, bl, nil
 }
 
-func (bx *BleXport) onError(err error) {
-       if !bx.setStateFrom(BLE_XPORT_STATE_STARTED, BLE_XPORT_STATE_STOPPED) &&
-               !bx.setStateFrom(BLE_XPORT_STATE_STARTING, 
BLE_XPORT_STATE_STOPPED) {
+func (bx *BleXport) shutdown(restart bool, err error) {
+       var fullyStarted bool
 
+       if bx.setStateFrom(BLE_XPORT_STATE_STARTED,
+               BLE_XPORT_STATE_STOPPING) {
+
+               fullyStarted = true
+       } else if bx.setStateFrom(BLE_XPORT_STATE_STARTING,
+               BLE_XPORT_STATE_STOPPING) {
+
+               fullyStarted = false
+       } else {
                // Stop already in progress.
                return
        }
+
+       // Stop the unixchild instance (blehostd + socket).
        if bx.client != nil {
                bx.client.Stop()
+
+               // Unblock the unixchild instance.
                bx.client.FromChild <- nil
        }
+
+       // Indicate an error to all of this transport's listeners.  This 
prevents
+       // them from blocking endlessly while awaiting a BLE message.
        bx.Bd.ErrorAll(err)
+
+       // Stop all of this transport's go routines.
+       for i := 0; i < bx.numStopListeners; i++ {
+               bx.stopChan <- struct{}{}
+       }
+
+       bx.setStateFrom(BLE_XPORT_STATE_STOPPING, BLE_XPORT_STATE_STOPPED)
+
+       // Indicate that the shutdown is complete.  If restarts are enabled on 
this
+       // transport, this signals that the transport should be started again.
+       if fullyStarted {
+               bx.shutdownChan <- restart
+       }
 }
 
 func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool {
@@ -202,47 +235,68 @@ func (bx *BleXport) getState() BleXportState {
 }
 
 func (bx *BleXport) Stop() error {
-       bx.onError(nil)
+       bx.shutdown(false, nil)
        return nil
 }
 
-func (bx *BleXport) Start() error {
+func (bx *BleXport) startOnce() error {
        if !bx.setStateFrom(BLE_XPORT_STATE_STOPPED, BLE_XPORT_STATE_STARTING) {
                return nmxutil.NewXportError("BLE xport started twice")
        }
 
+       bx.stopChan = make(chan struct{})
+       bx.numStopListeners = 0
+       bx.Bd.Clear()
+
        bx.createUnixChild()
        if err := bx.client.Start(); err != nil {
                if unixchild.IsUcAcceptError(err) {
-                       err = nmxutil.NewXportError("blehostd did not connect 
to socket; " +
-                               "controller not attached?")
+                       err = nmxutil.NewXportError(
+                               "blehostd did not connect to socket; " +
+                                       "controller not attached?")
                } else {
+                       panic(err.Error())
                        err = nmxutil.NewXportError(
                                "Failed to start child process: " + err.Error())
                }
-               bx.setStateFrom(BLE_XPORT_STATE_STARTING, 
BLE_XPORT_STATE_STOPPED)
+               bx.shutdown(true, err)
                return err
        }
 
        go func() {
-               err := <-bx.client.ErrChild
-               err = nmxutil.NewXportError("BLE transport error: " + 
err.Error())
-               fmt.Printf("%s\n", err.Error())
-               bx.onError(err)
+               bx.numStopListeners++
+               for {
+                       select {
+                       case err := <-bx.client.ErrChild:
+                               err = nmxutil.NewXportError("BLE transport 
error: " +
+                                       err.Error())
+                               go bx.shutdown(true, err)
+
+                       case <-bx.stopChan:
+                               return
+                       }
+               }
        }()
 
        go func() {
+               bx.numStopListeners++
                for {
-                       if b := bx.rx(); b == nil {
-                               // The error should have been reported to 
everyone interested.
-                               break
+                       select {
+                       case buf := <-bx.client.FromChild:
+                               if len(buf) != 0 {
+                                       log.Debugf("Receive from 
blehostd:\n%s", hex.Dump(buf))
+                                       bx.Bd.Dispatch(buf)
+                               }
+
+                       case <-bx.stopChan:
+                               return
                        }
                }
        }()
 
        synced, bl, err := bx.initialSyncCheck()
        if err != nil {
-               bx.Stop()
+               bx.shutdown(true, err)
                return err
        }
 
@@ -253,6 +307,7 @@ func (bx *BleXport) Start() error {
                for {
                        select {
                        case err := <-bl.ErrChan:
+                               bx.shutdown(true, err)
                                return err
                        case bm := <-bl.BleChan:
                                switch msg := bm.(type) {
@@ -262,34 +317,37 @@ func (bx *BleXport) Start() error {
                                        }
                                }
                        case <-time.After(bx.cfg.SyncTimeout):
-                               bx.Stop()
-                               return nmxutil.NewXportError(
+                               err := nmxutil.NewXportError(
                                        "Timeout waiting for host <-> 
controller sync")
+                               bx.shutdown(true, err)
+                               return err
                        }
                }
        }
 
        // Host and controller are synced.  Listen for sync loss in the 
background.
        go func() {
+               bx.numStopListeners++
                for {
                        select {
                        case err := <-bl.ErrChan:
-                               bx.onError(err)
-                               return
+                               go bx.shutdown(true, err)
                        case bm := <-bl.BleChan:
                                switch msg := bm.(type) {
                                case *BleSyncEvt:
                                        if !msg.Synced {
-                                               
bx.onError(nmxutil.NewXportError(
+                                               go bx.shutdown(true, 
nmxutil.NewXportError(
                                                        "BLE host <-> 
controller sync lost"))
-                                               return
                                        }
                                }
+                       case <-bx.stopChan:
+                               return
                        }
                }
        }()
 
        if !bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STARTED) {
+               bx.shutdown(true, err)
                return nmxutil.NewXportError(
                        "Internal error; BLE transport in unexpected state")
        }
@@ -297,6 +355,47 @@ func (bx *BleXport) Start() error {
        return nil
 }
 
+func (bx *BleXport) Start() error {
+       // Try to start the transport.  If this first attempt fails, report the
+       // error and don't retry.
+       if err := bx.startOnce(); err != nil {
+               log.Debugf("Error starting BLE transport: %s",
+                       err.Error())
+               return err
+       }
+
+       // Now that the first start attempt has succeeded, start a restart loop 
in
+       // the background.
+       go func() {
+               // Block until transport shuts down.
+               restart := <-bx.shutdownChan
+               for {
+                       // If restarts are disabled, or if the shutdown was a 
result of an
+                       // explicit stop call (instead of an unexpected error), 
stop
+                       // restarting the transport.
+                       if !bx.cfg.BlehostdRestart || !restart {
+                               break
+                       }
+
+                       // Wait a second before the next restart.  This is 
necessary to
+                       // ensure the unix domain socket can be rebound.
+                       time.Sleep(time.Second)
+
+                       // Attempt to start the transport again.
+                       if err := bx.startOnce(); err != nil {
+                               // Start attempt failed.
+                               log.Debugf("Error starting BLE transport: %s",
+                                       err.Error())
+                       } else {
+                               // Success.  Block until the transport shuts 
down.
+                               restart = <-bx.shutdownChan
+                       }
+               }
+       }()
+
+       return nil
+}
+
 func (bx *BleXport) txNoSync(data []byte) {
        log.Debugf("Tx to blehostd:\n%s", hex.Dump(data))
        bx.client.ToChild <- data
@@ -304,23 +403,15 @@ func (bx *BleXport) txNoSync(data []byte) {
 
 func (bx *BleXport) Tx(data []byte) error {
        if bx.getState() != BLE_XPORT_STATE_STARTED {
-               return nmxutil.NewXportError("Attempt to transmit before BLE 
xport " +
-                       "fully started")
+               return nmxutil.NewXportError(
+                       fmt.Sprintf("Attempt to transmit before BLE xport fully 
started; "+
+                               "state=%d", bx.getState()))
        }
 
        bx.txNoSync(data)
        return nil
 }
 
-func (bx *BleXport) rx() []byte {
-       buf := <-bx.client.FromChild
-       if len(buf) != 0 {
-               log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf))
-               bx.Bd.Dispatch(buf)
-       }
-       return buf
-}
-
 func (bx *BleXport) RspTimeout() time.Duration {
        return bx.cfg.BlehostdRspTimeout
 }

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/2aeb4daa/nmxact/nmble/dispatch.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/dispatch.go b/nmxact/nmble/dispatch.go
index 546fc11..c9ef39e 100644
--- a/nmxact/nmble/dispatch.go
+++ b/nmxact/nmble/dispatch.go
@@ -267,7 +267,10 @@ func (bd *BleDispatcher) Dispatch(data []byte) {
                return
        }
 
+       bd.mutex.Lock()
        _, listener := bd.findListener(base)
+       bd.mutex.Unlock()
+
        if listener == nil {
                log.Debugf(
                        "No BLE listener for op=%d type=%d seq=%d 
connHandle=%d",
@@ -295,3 +298,15 @@ func (bd *BleDispatcher) ErrorAll(err error) {
                listener.ErrChan <- err
        }
 }
+
+func (bd *BleDispatcher) Clear() {
+       bd.mutex.Lock()
+       defer bd.mutex.Unlock()
+
+       for s, _ := range bd.seqMap {
+               delete(bd.seqMap, s)
+       }
+       for b, _ := range bd.baseMap {
+               delete(bd.baseMap, b)
+       }
+}

Reply via email to