Repository: thrift Updated Branches: refs/heads/master d1380d529 -> a57689639
THRIFT-4203 thrift server stop gracefully Client: Go Patch: libinbin <[email protected]> This closes #1271 Project: http://git-wip-us.apache.org/repos/asf/thrift/repo Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/a5768963 Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/a5768963 Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/a5768963 Branch: refs/heads/master Commit: a576896398f03d1854f128479d31659446c51027 Parents: d1380d5 Author: libinbin <[email protected]> Authored: Thu May 18 14:18:28 2017 +0800 Committer: Jens Geyer <[email protected]> Committed: Thu May 18 23:24:52 2017 +0200 ---------------------------------------------------------------------- lib/go/thrift/simple_server.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/thrift/blob/a5768963/lib/go/thrift/simple_server.go ---------------------------------------------------------------------- diff --git a/lib/go/thrift/simple_server.go b/lib/go/thrift/simple_server.go index e207bd9..5c848f2 100644 --- a/lib/go/thrift/simple_server.go +++ b/lib/go/thrift/simple_server.go @@ -39,6 +39,7 @@ type TSimpleServer struct { outputTransportFactory TTransportFactory inputProtocolFactory TProtocolFactory outputProtocolFactory TProtocolFactory + sync.WaitGroup } func NewTSimpleServer2(processor TProcessor, serverTransport TServerTransport) *TSimpleServer { @@ -135,6 +136,7 @@ func (p *TSimpleServer) AcceptLoop() error { return err } if client != nil { + p.Add(1) go func() { if err := p.processRequests(client); err != nil { log.Println("error processing request:", err) @@ -157,14 +159,17 @@ var once sync.Once func (p *TSimpleServer) Stop() error { q := func() { - p.quit <- struct{}{} + close(p.quit) p.serverTransport.Interrupt() + p.Wait() } once.Do(q) return nil } func (p *TSimpleServer) processRequests(client TTransport) error { + defer p.Done() + processor := p.processorFactory.GetProcessor(client) inputTransport := p.inputTransportFactory.GetTransport(client) outputTransport := p.outputTransportFactory.GetTransport(client) @@ -175,6 +180,7 @@ func (p *TSimpleServer) processRequests(client TTransport) error { log.Printf("panic in processor: %s: %s", e, debug.Stack()) } }() + if inputTransport != nil { defer inputTransport.Close() } @@ -182,6 +188,12 @@ func (p *TSimpleServer) processRequests(client TTransport) error { defer outputTransport.Close() } for { + select { + case <-p.quit: + return nil + default: + } + ok, err := processor.Process(inputProtocol, outputProtocol) if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE { return nil @@ -191,7 +203,7 @@ func (p *TSimpleServer) processRequests(client TTransport) error { if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD { continue } - if !ok { + if !ok { break } }
