Updated Branches: refs/heads/master 7d95246f1 -> 44b19df53
THRIFT-2083 Improve the go lib: Testcase for buffered Transport Patch: Feng Shen Project: http://git-wip-us.apache.org/repos/asf/thrift/repo Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/44b19df5 Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/44b19df5 Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/44b19df5 Branch: refs/heads/master Commit: 44b19df535fa83855f91d11c2ecf2e73360a5444 Parents: 7d95246 Author: Jens Geyer <[email protected]> Authored: Fri Jul 26 23:05:00 2013 +0200 Committer: Jens Geyer <[email protected]> Committed: Fri Jul 26 23:05:00 2013 +0200 ---------------------------------------------------------------------- lib/go/thrift/buffed_transport.go | 87 --------------------- lib/go/thrift/buffered_transport.go | 106 ++++++++++++++++++++++++++ lib/go/thrift/buffered_transport_test.go | 29 +++++++ tutorial/go/src/client.go | 4 +- tutorial/go/src/main.go | 15 +++- tutorial/go/src/server.go | 4 +- 6 files changed, 151 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/thrift/blob/44b19df5/lib/go/thrift/buffed_transport.go ---------------------------------------------------------------------- diff --git a/lib/go/thrift/buffed_transport.go b/lib/go/thrift/buffed_transport.go deleted file mode 100644 index 1ba3053..0000000 --- a/lib/go/thrift/buffed_transport.go +++ /dev/null @@ -1,87 +0,0 @@ -package thrift - -type TBufferedTransportFactory struct { - size int -} - -type TBuffer struct { - buffer []byte - pos, limit int -} - -type TBufferedTransport struct { - tp TTransport - rbuf *TBuffer - wbuf *TBuffer -} - -func (p *TBufferedTransportFactory) GetTransport(trans TTransport) TTransport { - return NewTBufferedTransport(trans, p.size) -} - -func NewTBufferedTransportFactory(bufferSize int) *TBufferedTransportFactory { - return &TBufferedTransportFactory{size: bufferSize} -} - -func NewTBufferedTransport(trans TTransport, bufferSize int) *TBufferedTransport { - rb := &TBuffer{buffer: make([]byte, bufferSize)} - wb := &TBuffer{buffer: make([]byte, bufferSize), limit: bufferSize} - return &TBufferedTransport{tp: trans, rbuf: rb, wbuf: wb} -} - -func (p *TBufferedTransport) IsOpen() bool { - return p.tp.IsOpen() -} - -func (p *TBufferedTransport) Open() (err error) { - return p.tp.Open() -} - -func (p *TBufferedTransport) Close() (err error) { - return p.tp.Close() -} - -func (p *TBufferedTransport) Read(buf []byte) (n int, err error) { - rbuf := p.rbuf - if rbuf.pos == rbuf.limit { // no more data to read from buffer - rbuf.pos = 0 - // read data, fill buffer - rbuf.limit, err = p.tp.Read(rbuf.buffer) - if err != nil { - return 0, err - } - } - n = copy(buf, rbuf.buffer[rbuf.pos:rbuf.limit]) - rbuf.pos += n - return n, nil -} - -func (p *TBufferedTransport) Write(buf []byte) (n int, err error) { - wbuf := p.wbuf - size := len(buf) - if wbuf.pos+size > wbuf.limit { // buffer is full, flush buffer - p.Flush() - } - n = copy(wbuf.buffer[wbuf.pos:], buf) - wbuf.pos += n - return n, nil -} - -func (p *TBufferedTransport) Flush() error { - start := 0 - wbuf := p.wbuf - for start < wbuf.pos { - n, err := p.tp.Write(wbuf.buffer[start:wbuf.pos]) - if err != nil { - return err - } - start += n - } - - wbuf.pos = 0 - return p.tp.Flush() -} - -func (p *TBufferedTransport) Peek() bool { - return p.rbuf.pos < p.rbuf.limit || p.tp.Peek() -} http://git-wip-us.apache.org/repos/asf/thrift/blob/44b19df5/lib/go/thrift/buffered_transport.go ---------------------------------------------------------------------- diff --git a/lib/go/thrift/buffered_transport.go b/lib/go/thrift/buffered_transport.go new file mode 100644 index 0000000..e3546a5 --- /dev/null +++ b/lib/go/thrift/buffered_transport.go @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package thrift + +type TBufferedTransportFactory struct { + size int +} + +type TBuffer struct { + buffer []byte + pos, limit int +} + +type TBufferedTransport struct { + tp TTransport + rbuf *TBuffer + wbuf *TBuffer +} + +func (p *TBufferedTransportFactory) GetTransport(trans TTransport) TTransport { + return NewTBufferedTransport(trans, p.size) +} + +func NewTBufferedTransportFactory(bufferSize int) *TBufferedTransportFactory { + return &TBufferedTransportFactory{size: bufferSize} +} + +func NewTBufferedTransport(trans TTransport, bufferSize int) *TBufferedTransport { + rb := &TBuffer{buffer: make([]byte, bufferSize)} + wb := &TBuffer{buffer: make([]byte, bufferSize), limit: bufferSize} + return &TBufferedTransport{tp: trans, rbuf: rb, wbuf: wb} +} + +func (p *TBufferedTransport) IsOpen() bool { + return p.tp.IsOpen() +} + +func (p *TBufferedTransport) Open() (err error) { + return p.tp.Open() +} + +func (p *TBufferedTransport) Close() (err error) { + return p.tp.Close() +} + +func (p *TBufferedTransport) Read(buf []byte) (n int, err error) { + rbuf := p.rbuf + if rbuf.pos == rbuf.limit { // no more data to read from buffer + rbuf.pos = 0 + // read data, fill buffer + rbuf.limit, err = p.tp.Read(rbuf.buffer) + if err != nil { + return 0, err + } + } + n = copy(buf, rbuf.buffer[rbuf.pos:rbuf.limit]) + rbuf.pos += n + return n, nil +} + +func (p *TBufferedTransport) Write(buf []byte) (n int, err error) { + wbuf := p.wbuf + size := len(buf) + if wbuf.pos+size > wbuf.limit { // buffer is full, flush buffer + p.Flush() + } + n = copy(wbuf.buffer[wbuf.pos:], buf) + wbuf.pos += n + return n, nil +} + +func (p *TBufferedTransport) Flush() error { + start := 0 + wbuf := p.wbuf + for start < wbuf.pos { + n, err := p.tp.Write(wbuf.buffer[start:wbuf.pos]) + if err != nil { + return err + } + start += n + } + + wbuf.pos = 0 + return p.tp.Flush() +} + +func (p *TBufferedTransport) Peek() bool { + return p.rbuf.pos < p.rbuf.limit || p.tp.Peek() +} http://git-wip-us.apache.org/repos/asf/thrift/blob/44b19df5/lib/go/thrift/buffered_transport_test.go ---------------------------------------------------------------------- diff --git a/lib/go/thrift/buffered_transport_test.go b/lib/go/thrift/buffered_transport_test.go new file mode 100644 index 0000000..95ec0cb --- /dev/null +++ b/lib/go/thrift/buffered_transport_test.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package thrift + +import ( + "testing" +) + +func TestBufferedTransport(t *testing.T) { + trans := NewTBufferedTransport(NewTMemoryBuffer(), 10240) + TransportTest(t, trans, trans) +} http://git-wip-us.apache.org/repos/asf/thrift/blob/44b19df5/tutorial/go/src/client.go ---------------------------------------------------------------------- diff --git a/tutorial/go/src/client.go b/tutorial/go/src/client.go index 114de19..7f8d28f 100644 --- a/tutorial/go/src/client.go +++ b/tutorial/go/src/client.go @@ -69,9 +69,9 @@ func handleClient(client *tutorial.CalculatorClient) (err error) { return err } -func runClient(transportFactory thrift.TTransportFactory, protocolFactory thrift.TProtocolFactory) error { +func runClient(transportFactory thrift.TTransportFactory, protocolFactory thrift.TProtocolFactory, addr string) error { var transport thrift.TTransport - transport, err := thrift.NewTSocket("localhost:9090") + transport, err := thrift.NewTSocket(addr) if err != nil { fmt.Println("Error opening socket:", err) return err http://git-wip-us.apache.org/repos/asf/thrift/blob/44b19df5/tutorial/go/src/main.go ---------------------------------------------------------------------- diff --git a/tutorial/go/src/main.go b/tutorial/go/src/main.go index 4b9576e..d371394 100644 --- a/tutorial/go/src/main.go +++ b/tutorial/go/src/main.go @@ -37,6 +37,8 @@ func main() { server := flag.Bool("server", false, "Run server") protocol := flag.String("P", "binary", "Specify the protocol (binary, compact, simplejson)") framed := flag.Bool("framed", false, "Use framed transport") + buffered := flag.Bool("buffered", false, "Use buffered transport") + addr := flag.String("addr", "localhost:9090", "Address to listen to") flag.Parse() @@ -55,17 +57,24 @@ func main() { Usage() os.Exit(1) } - transportFactory := thrift.NewTTransportFactory() + + var transportFactory thrift.TTransportFactory + if *buffered { + transportFactory = thrift.NewTBufferedTransportFactory(8192) + } else { + transportFactory = thrift.NewTTransportFactory() + } + if *framed { transportFactory = thrift.NewTFramedTransportFactory(transportFactory) } if *server { - if err := runServer(transportFactory, protocolFactory); err != nil { + if err := runServer(transportFactory, protocolFactory, *addr); err != nil { fmt.Println("error running server:", err) } } else { - if err := runClient(transportFactory, protocolFactory); err != nil { + if err := runClient(transportFactory, protocolFactory, *addr); err != nil { fmt.Println("error running client:", err) } } http://git-wip-us.apache.org/repos/asf/thrift/blob/44b19df5/tutorial/go/src/server.go ---------------------------------------------------------------------- diff --git a/tutorial/go/src/server.go b/tutorial/go/src/server.go index 929e223..aea749e 100644 --- a/tutorial/go/src/server.go +++ b/tutorial/go/src/server.go @@ -25,8 +25,8 @@ import ( "tutorial" ) -func runServer(transportFactory thrift.TTransportFactory, protocolFactory thrift.TProtocolFactory) error { - transport, err := thrift.NewTServerSocket("localhost:9090") +func runServer(transportFactory thrift.TTransportFactory, protocolFactory thrift.TProtocolFactory, addr string) error { + transport, err := thrift.NewTServerSocket(addr) if err != nil { return err }
