unixchild additions * User specifies socket accept timeout * New error type indicating accept error * Don't allow double start or stop
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/commit/df674371 Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/tree/df674371 Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/diff/df674371 Branch: refs/heads/master Commit: df674371e341eeb3761c734aa596cc37b6c67ef9 Parents: aa7aa13 Author: Christopher Collins <[email protected]> Authored: Fri Mar 10 11:10:40 2017 -0800 Committer: Christopher Collins <[email protected]> Committed: Thu Mar 30 20:23:55 2017 -0700 ---------------------------------------------------------------------- util/unixchild/unixchild.go | 160 +++++++++++++++++++++++++++++++-------- 1 file changed, 128 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/blob/df674371/util/unixchild/unixchild.go ---------------------------------------------------------------------- diff --git a/util/unixchild/unixchild.go b/util/unixchild/unixchild.go index bc311fc..b8be35d 100644 --- a/util/unixchild/unixchild.go +++ b/util/unixchild/unixchild.go @@ -22,7 +22,6 @@ package unixchild import ( "bufio" "encoding/binary" - "errors" "fmt" "io" "net" @@ -35,38 +34,73 @@ import ( log "github.com/Sirupsen/logrus" ) +type UcAcceptError struct { + Text string +} + +func (err *UcAcceptError) Error() string { + return err.Text +} + +func NewUcAcceptError(text string) *UcAcceptError { + return &UcAcceptError{ + Text: text, + } +} + +func IsUcAcceptError(err error) bool { + _, ok := err.(*UcAcceptError) + return ok +} + type Config struct { - SockPath string - ChildPath string - ChildArgs []string - Depth int - MaxMsgSz int + SockPath string + ChildPath string + ChildArgs []string + Depth int + MaxMsgSz int + AcceptTimeout time.Duration + Restart bool } +type clientState uint32 + +const ( + CLIENT_STATE_STOPPED clientState = iota + CLIENT_STATE_STARTING + CLIENT_STATE_STARTED + CLIENT_STATE_STOPPING +) + type Client struct { - FromChild chan []byte - ToChild chan []byte - ErrChild chan error - childPath string - sockPath string - childArgs []string - maxMsgSz int - stopping bool - stop chan bool - stopped chan bool + FromChild chan []byte + ToChild chan []byte + ErrChild chan error + childPath string + sockPath string + childArgs []string + maxMsgSz int + acceptTimeout time.Duration + restart bool + stop chan bool + stopped chan bool + state clientState + stateMutex sync.Mutex } func New(conf Config) *Client { c := &Client{ - childPath: conf.ChildPath, - sockPath: conf.SockPath, - childArgs: conf.ChildArgs, - maxMsgSz: conf.MaxMsgSz, - FromChild: make(chan []byte, conf.Depth), - ToChild: make(chan []byte, conf.Depth), - ErrChild: make(chan error), - stop: make(chan bool), - stopped: make(chan bool), + childPath: conf.ChildPath, + sockPath: conf.SockPath, + childArgs: conf.ChildArgs, + maxMsgSz: conf.MaxMsgSz, + FromChild: make(chan []byte, conf.Depth), + ToChild: make(chan []byte, conf.Depth), + ErrChild: make(chan error), + acceptTimeout: conf.AcceptTimeout, + restart: conf.Restart, + stop: make(chan bool), + stopped: make(chan bool), } if c.maxMsgSz == 0 { @@ -76,6 +110,42 @@ func New(conf Config) *Client { return c } +func (c *Client) getState() clientState { + c.stateMutex.Lock() + defer c.stateMutex.Unlock() + + return c.state +} + +func (c *Client) setState(toState clientState) { + c.stateMutex.Lock() + defer c.stateMutex.Unlock() + + c.state = toState +} + +func (c *Client) setStateIf(toState clientState, + pred func(st clientState) bool) (bool, clientState) { + + c.stateMutex.Lock() + defer c.stateMutex.Unlock() + + if pred(c.state) { + c.state = toState + return true, toState + } + + return false, c.state + +} + +func (c *Client) setStateFrom(fromState clientState, + toState clientState) (bool, clientState) { + + return c.setStateIf(toState, + func(st clientState) bool { return st == fromState }) +} + func (c *Client) startChild() (*exec.Cmd, error) { subProcess := exec.Command(c.childPath, c.childArgs...) @@ -179,10 +249,14 @@ func (c *Client) handleChild(con net.Conn) { } func (c *Client) Stop() { - if c.stopping { + ok, _ := c.setStateIf(CLIENT_STATE_STOPPING, + func(st clientState) bool { + return st != CLIENT_STATE_STOPPING + }) + if !ok { return } - c.stopping = true + log.Debugf("Stopping client") c.stop <- true @@ -194,10 +268,24 @@ func (c *Client) Stop() { } } +func (c *Client) acceptDeadline() *time.Time { + if c.acceptTimeout == 0 { + return nil + } + + t := time.Now().Add(c.acceptTimeout) + return &t +} + func (c *Client) Start() error { + ok, state := c.setStateFrom(CLIENT_STATE_STOPPED, CLIENT_STATE_STARTING) + if !ok { + return fmt.Errorf("client in invalid state for stating: %d", state) + } l, err := net.Listen("unix", c.sockPath) if err != nil { + c.setState(CLIENT_STATE_STOPPED) return err } @@ -211,16 +299,21 @@ func (c *Client) Start() error { log.Debugf("unixchild start error: %s", err.Error()) c.ErrChild <- fmt.Errorf("Child start error: %s", err.Error()) } else { + if t := c.acceptDeadline(); t != nil { + l.(*net.UnixListener).SetDeadline(*t) + } fd, err := l.Accept() if err != nil { - log.Debugf("unixchild accept error: %s", err.Error()) + text := fmt.Sprintf("unixchild accept error: %s", + err.Error()) + c.ErrChild <- NewUcAcceptError(text) } else { + c.setState(CLIENT_STATE_STARTED) c.handleChild(fd) + c.ErrChild <- fmt.Errorf("Child exited") } - cmd.Process.Kill() - c.ErrChild <- errors.New("Child exited") } - if c.stopping { + if c.getState() == CLIENT_STATE_STOPPING { log.Debugf("unixchild exit loop") return } @@ -231,10 +324,13 @@ func (c *Client) Start() error { go func() { select { case <-c.stop: - l.Close() + if c.getState() == CLIENT_STATE_STARTED { + l.Close() + } if cmd != nil { cmd.Process.Kill() } + log.Debugf("deleting socket") os.Remove(c.sockPath) c.stopped <- true }
