PROTON-1952: [go] Server connection fails to authenticate Problem: If the Server() option came after SASL options in the option list, the server hangs in authentication.
Solution: NewConnection panics if there is a Server() option that is not the first option in the list. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/55b27351 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/55b27351 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/55b27351 Branch: refs/heads/master Commit: 55b27351de92258cb4e8d7ba32701b6c0ba93a3c Parents: c249107 Author: Alan Conway <[email protected]> Authored: Fri Oct 5 12:02:28 2018 -0400 Committer: Alan Conway <[email protected]> Committed: Thu Oct 11 12:33:08 2018 -0400 ---------------------------------------------------------------------- go/src/qpid.apache.org/electron/common_test.go | 32 +++++++++--------- go/src/qpid.apache.org/electron/connection.go | 37 +++++++++++++++------ go/src/qpid.apache.org/electron/container.go | 5 +-- 3 files changed, 46 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/55b27351/go/src/qpid.apache.org/electron/common_test.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/common_test.go b/go/src/qpid.apache.org/electron/common_test.go index 3aad825..e2e4a2a 100644 --- a/go/src/qpid.apache.org/electron/common_test.go +++ b/go/src/qpid.apache.org/electron/common_test.go @@ -25,6 +25,7 @@ import ( "path" "reflect" "runtime" + "sync" "testing" ) @@ -69,16 +70,12 @@ type pair struct { auth connectionSettings } -func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts []ConnectionOption) *pair { - opts := append([]ConnectionOption{Server()}, serverOpts...) - sc, _ := NewConnection(srv, opts...) - opts = append([]ConnectionOption{}, clientOpts...) - cc, _ := NewConnection(cli, opts...) - cs, _ := cc.Session() +func newPair(t testing.TB, cli, srv Connection) *pair { + cs, _ := cli.Session() p := &pair{ t: t, client: cs, - server: sc, + server: srv, capacity: 100, rchan: make(chan Receiver), schan: make(chan Sender)} @@ -107,26 +104,31 @@ func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts []Connectio // AMQP pair linked by in-memory pipe func newPipe(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair { cli, srv := net.Pipe() - return newPair(t, cli, srv, clientOpts, serverOpts) + opts := []ConnectionOption{Server(), ContainerId(t.Name() + "-server")} + sc, _ := NewConnection(srv, append(opts, serverOpts...)...) + opts = []ConnectionOption{ContainerId(t.Name() + "-client")} + cc, _ := NewConnection(cli, append(opts, clientOpts...)...) + return newPair(t, cc, sc) } // AMQP pair linked by TCP socket func newSocketPair(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair { l, err := net.Listen("tcp4", ":0") // For systems with ipv6 disabled fatalIfN(t, err, 1) - srvCh := make(chan net.Conn) + var srv Connection var srvErr error + var wg sync.WaitGroup + wg.Add(1) go func() { - var c net.Conn - c, srvErr = l.Accept() - srvCh <- c + defer wg.Done() + srv, srvErr = NewContainer(t.Name()+"-server").Accept(l, serverOpts...) }() addr := l.Addr() - cli, err := net.Dial(addr.Network(), addr.String()) + cli, err := NewContainer(t.Name()+"-client").Dial(addr.Network(), addr.String(), clientOpts...) fatalIfN(t, err, 1) - srv := <-srvCh + wg.Wait() fatalIfN(t, srvErr, 1) - return newPair(t, cli, srv, clientOpts, serverOpts) + return newPair(t, cli, srv) } func (p *pair) close() { p.client.Connection().Close(nil); p.server.Close(nil) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/55b27351/go/src/qpid.apache.org/electron/connection.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/connection.go b/go/src/qpid.apache.org/electron/connection.go index 0d5e7c5..628d933 100644 --- a/go/src/qpid.apache.org/electron/connection.go +++ b/go/src/qpid.apache.org/electron/connection.go @@ -26,9 +26,10 @@ import ( "crypto/rand" "encoding/hex" "net" - "qpid.apache.org/proton" "sync" "time" + + "qpid.apache.org/proton" ) // Settings associated with a Connection. @@ -147,7 +148,7 @@ func Password(password []byte) ConnectionOption { // net.Listener.Accept() // func Server() ConnectionOption { - return func(c *connection) { c.engine.Server(); c.server = true; AllowIncoming()(c) } + return func(c *connection) { c.setServer() } } // AllowIncoming returns a ConnectionOption to enable incoming endpoints, see @@ -174,13 +175,13 @@ type connection struct { defaultSessionOnce, closeOnce sync.Once - container *container - conn net.Conn - server bool - incoming chan Incoming - handler *handler - engine *proton.Engine - pConnection proton.Connection + container *container + conn net.Conn + server, client bool + incoming chan Incoming + handler *handler + engine *proton.Engine + pConnection proton.Connection defaultSession Session } @@ -198,8 +199,13 @@ func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) return nil, err } c.pConnection = c.engine.Connection() - for _, set := range opts { - set(c) + for _, opt := range opts { + opt(c) + // If the first option is not Server(), then we are a client. + // Applying Server() after other options is an error + if !c.server { + c.client = true + } } if c.container == nil { // Generate a random container-id. Not an RFC4122-compliant UUID but probably-unique @@ -216,6 +222,15 @@ func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) return c, nil } +func (c *connection) setServer() { + if c.client { + panic("electron.Server() must be first in the ConnectionOption list") + } + c.server = true + c.engine.Server() + AllowIncoming()(c) +} + func (c *connection) run() { if !c.server { c.pConnection.Open() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/55b27351/go/src/qpid.apache.org/electron/container.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/container.go b/go/src/qpid.apache.org/electron/container.go index 7c19aa5..9990706 100644 --- a/go/src/qpid.apache.org/electron/container.go +++ b/go/src/qpid.apache.org/electron/container.go @@ -21,9 +21,10 @@ package electron import ( "net" - "qpid.apache.org/proton" "strconv" "sync/atomic" + + "qpid.apache.org/proton" ) // Container is an AMQP container, it represents a single AMQP "application" @@ -99,7 +100,7 @@ func (cont *container) Dial(network, address string, opts ...ConnectionOption) ( func (cont *container) Accept(l net.Listener, opts ...ConnectionOption) (c Connection, err error) { conn, err := l.Accept() if err == nil { - c, err = cont.Connection(conn, append(opts, Server())...) + c, err = cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...) } return } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
