Repository: qpid-proton
Updated Branches:
  refs/heads/master 9f219cef0 -> 7486d5b85


NO-JIRA: go: Fix issues found by "go vet", minor simplification and bugfixing.

- refactor to fix go vet errors about copying a Mutex.
- hide Link endpoint interface, provide simpler LinkSettings common interface 
for Sender, Receiver.
- simplified connection close logic - process all final events.
- fix async sender blocking when no outcome requested.
- simplify receiver flow control logic
- simplify Send() logic


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/7486d5b8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/7486d5b8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/7486d5b8

Branch: refs/heads/master
Commit: 7486d5b853ff26f707efe0ded73fba0d231d3733
Parents: 9f219ce
Author: Alan Conway <[email protected]>
Authored: Fri Dec 11 13:08:48 2015 -0500
Committer: Alan Conway <[email protected]>
Committed: Thu Jan 7 17:26:45 2016 -0500

----------------------------------------------------------------------
 .../src/qpid.apache.org/electron/connection.go  |  12 +-
 .../go/src/qpid.apache.org/electron/endpoint.go |   5 +-
 .../go/src/qpid.apache.org/electron/handler.go  |   6 +-
 .../go/src/qpid.apache.org/electron/link.go     |  90 ++++++------
 .../qpid.apache.org/electron/messaging_test.go  | 114 +++++++++------
 .../go/src/qpid.apache.org/electron/receiver.go | 139 ++++++++-----------
 .../go/src/qpid.apache.org/electron/sender.go   |  61 ++++----
 .../go/src/qpid.apache.org/electron/session.go  |   6 +-
 .../go/src/qpid.apache.org/proton/engine.go     |  74 +++++-----
 .../go/src/qpid.apache.org/proton/wrappers.go   |   2 +-
 10 files changed, 264 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 386875d..1f8bd40 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -120,7 +120,7 @@ func newConnection(conn net.Conn, cont *container, setting 
...ConnectionOption)
        for _, set := range setting {
                set(c)
        }
-       c.endpoint = makeEndpoint(c.engine.String())
+       c.endpoint.init(c.engine.String())
        c.eConnection = c.engine.Connection()
        go c.run()
        return c, nil
@@ -134,9 +134,15 @@ func (c *connection) run() {
        c.closed(Closed)
 }
 
-func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
+func (c *connection) Close(err error) {
+       c.err.Set(err)
+       c.engine.Close(err)
+}
 
-func (c *connection) Disconnect(err error) { c.err.Set(err); 
c.engine.Disconnect(err) }
+func (c *connection) Disconnect(err error) {
+       c.err.Set(err)
+       c.engine.Disconnect(err)
+}
 
 func (c *connection) Session(setting ...SessionOption) (Session, error) {
        var s Session

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
index 2b1f62d..fc701c6 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -52,6 +52,9 @@ type Endpoint interface {
        // Done returns a channel that will close when the endpoint closes.
        // Error() will contain the reason.
        Done() <-chan struct{}
+
+       // Called in handler goroutine when endpoint is remotely closed.
+       closed(err error) error
 }
 
 // DEVELOPER NOTES
@@ -64,7 +67,7 @@ type endpoint struct {
        done chan struct{}
 }
 
-func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan 
struct{})} }
+func (e *endpoint) init(s string) { e.str = s; e.done = make(chan struct{}) }
 
 // Called in handler on a Closed event. Marks the endpoint as closed and the 
corresponding
 // proton.Endpoint pointer as invalid. Injected functions should check Error() 
to ensure

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 0237156..eb53df3 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -29,7 +29,7 @@ import (
 type handler struct {
        delegator    *proton.MessagingAdapter
        connection   *connection
-       links        map[proton.Link]Link
+       links        map[proton.Link]Endpoint
        sentMessages map[proton.Delivery]sentMessage
        sessions     map[proton.Session]*session
 }
@@ -37,7 +37,7 @@ type handler struct {
 func newHandler(c *connection) *handler {
        h := &handler{
                connection:   c,
-               links:        make(map[proton.Link]Link),
+               links:        make(map[proton.Link]Endpoint),
                sentMessages: make(map[proton.Delivery]sentMessage),
                sessions:     make(map[proton.Session]*session),
        }
@@ -152,7 +152,7 @@ func (h *handler) incoming(in Incoming) {
        }
 }
 
-func (h *handler) addLink(pl proton.Link, el Link) {
+func (h *handler) addLink(pl proton.Link, el Endpoint) {
        h.links[pl] = el
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index 91efa8e..80b4d5c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -24,11 +24,8 @@ import (
        "qpid.apache.org/proton"
 )
 
-// Link is the common interface for AMQP links. Sender and Receiver provide
-// more methods for the sending or receiving end of a link respectively.
-type Link interface {
-       Endpoint
-
+// Settings associated with a link
+type LinkSettings interface {
        // Source address that messages are coming from.
        Source() string
 
@@ -53,46 +50,41 @@ type Link interface {
 
        // Session containing the Link
        Session() Session
-
-       // Called in event loop on closed event.
-       closed(err error)
-       // Called to open a link (local or accepted incoming link)
-       open()
 }
 
 // LinkOption can be passed when creating a sender or receiver link to set 
optional configuration.
-type LinkOption func(*link)
+type LinkOption func(*linkSettings)
 
 // Source returns a LinkOption that sets address that messages are coming from.
-func Source(s string) LinkOption { return func(l *link) { l.source = s } }
+func Source(s string) LinkOption { return func(l *linkSettings) { l.source = s 
} }
 
 // Target returns a LinkOption that sets address that messages are going to.
-func Target(s string) LinkOption { return func(l *link) { l.target = s } }
+func Target(s string) LinkOption { return func(l *linkSettings) { l.target = s 
} }
 
 // LinkName returns a LinkOption that sets the link name.
-func LinkName(s string) LinkOption { return func(l *link) { l.target = s } }
+func LinkName(s string) LinkOption { return func(l *linkSettings) { l.target = 
s } }
 
 // SndSettle returns a LinkOption that sets the send settle mode
-func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { 
l.sndSettle = m } }
+func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { 
l.sndSettle = m } }
 
 // RcvSettle returns a LinkOption that sets the send settle mode
-func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { 
l.rcvSettle = m } }
+func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { 
l.rcvSettle = m } }
 
 // SndSettleMode returns a LinkOption that defines when the sending end of the
 // link settles message delivery.
 type SndSettleMode proton.SndSettleMode
 
 // Capacity returns a LinkOption that sets the link capacity
-func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } }
+func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = 
n } }
 
 // Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not 
relevant for a sender.
-func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } }
+func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch = 
p } }
 
 // AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
 // are sent but no acknowledgment is received, messages can be lost if there is
 // a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
 func AtMostOnce() LinkOption {
-       return func(l *link) {
+       return func(l *linkSettings) {
                SndSettle(SndSettled)(l)
                RcvSettle(RcvFirst)(l)
        }
@@ -104,7 +96,7 @@ func AtMostOnce() LinkOption {
 // chance that the message will be received twice in this case.  Sets
 // SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
 func AtLeastOnce() LinkOption {
-       return func(l *link) {
+       return func(l *linkSettings) {
                SndSettle(SndUnsettled)(l)
                RcvSettle(RcvFirst)(l)
        }
@@ -129,10 +121,7 @@ const (
        RcvSecond = RcvSettleMode(proton.RcvSecond)
 )
 
-type link struct {
-       endpoint
-
-       // Link settings.
+type linkSettings struct {
        source    string
        target    string
        linkName  string
@@ -141,31 +130,35 @@ type link struct {
        rcvSettle RcvSettleMode
        capacity  int
        prefetch  bool
+       session   *session
+       eLink     proton.Link
+}
 
-       session *session
-       eLink   proton.Link
+type link struct {
+       endpoint
+       linkSettings
 }
 
-func (l *link) Source() string           { return l.source }
-func (l *link) Target() string           { return l.target }
-func (l *link) LinkName() string         { return l.linkName }
-func (l *link) IsSender() bool           { return l.isSender }
-func (l *link) IsReceiver() bool         { return !l.isSender }
-func (l *link) SndSettle() SndSettleMode { return l.sndSettle }
-func (l *link) RcvSettle() RcvSettleMode { return l.rcvSettle }
-func (l *link) Session() Session         { return l.session }
-func (l *link) Connection() Connection   { return l.session.Connection() }
+func (l *linkSettings) Source() string           { return l.source }
+func (l *linkSettings) Target() string           { return l.target }
+func (l *linkSettings) LinkName() string         { return l.linkName }
+func (l *linkSettings) IsSender() bool           { return l.isSender }
+func (l *linkSettings) IsReceiver() bool         { return !l.isSender }
+func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle }
+func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle }
 
+func (l *link) Session() Session       { return l.session }
+func (l *link) Connection() Connection { return l.session.Connection() }
 func (l *link) engine() *proton.Engine { return l.session.connection.engine }
 func (l *link) handler() *handler      { return l.session.connection.handler }
 
-// Set up link fields and open the proton.Link
-func localLink(sn *session, isSender bool, setting ...LinkOption) (link, 
error) {
-       l := link{
-               session:  sn,
+// Open a link and return the linkSettings.
+func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) 
(linkSettings, error) {
+       l := linkSettings{
                isSender: isSender,
                capacity: 1,
                prefetch: false,
+               session:  sn,
        }
        for _, set := range setting {
                set(&l)
@@ -179,31 +172,29 @@ func localLink(sn *session, isSender bool, setting 
...LinkOption) (link, error)
                l.eLink = l.session.eSession.Receiver(l.linkName)
        }
        if l.eLink.IsNil() {
-               l.err.Set(fmt.Errorf("cannot create link %s", l))
-               return l, l.err.Get()
+               return l, fmt.Errorf("cannot create link %s", l.eLink)
        }
        l.eLink.Source().SetAddress(l.source)
        l.eLink.Target().SetAddress(l.target)
        l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
        l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
        l.eLink.Open()
-       l.endpoint = makeEndpoint(l.eLink.String())
        return l, nil
 }
 
 type incomingLink struct {
        incoming
-       link
+       linkSettings
+       eLink proton.Link
+       sn    *session
 }
 
 // Set up a link from an incoming proton.Link.
 func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
        l := incomingLink{
                incoming: makeIncoming(eLink),
-               link: link{
-                       session:   sn,
+               linkSettings: linkSettings{
                        isSender:  eLink.IsSender(),
-                       eLink:     eLink,
                        source:    eLink.RemoteSource().Address(),
                        target:    eLink.RemoteTarget().Address(),
                        linkName:  eLink.Name(),
@@ -211,7 +202,8 @@ func makeIncomingLink(sn *session, eLink proton.Link) 
incomingLink {
                        rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
                        capacity:  1,
                        prefetch:  false,
-                       endpoint:  makeEndpoint(eLink.String()),
+                       eLink:     eLink,
+                       session:   sn,
                },
        }
        return l
@@ -239,7 +231,3 @@ func (l *link) Close(err error) {
                }
        })
 }
-
-func (l *link) open() {
-       l.eLink.Open()
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
index 5af57e8..0de7d16 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
@@ -270,24 +270,26 @@ func TestTimeouts(t *testing.T) {
        }
 }
 
-// clientServer that returns sender/receiver pairs at opposite ends of link.
+// A server that returns the opposite end of each client link via channels.
 type pairs struct {
-       t      *testing.T
-       client Session
-       server Connection
-       rchan  chan Receiver
-       schan  chan Sender
+       t        *testing.T
+       client   Session
+       server   Connection
+       rchan    chan Receiver
+       schan    chan Sender
+       capacity int
+       prefetch bool
 }
 
-func newPairs(t *testing.T) *pairs {
+func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
        p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan 
Sender, 1)}
        p.client, p.server = newClientServer(t)
        go func() {
                for i := range p.server.Incoming() {
                        switch i := i.(type) {
                        case *IncomingReceiver:
-                               i.SetCapacity(1)
-                               i.SetPrefetch(false)
+                               i.SetCapacity(capacity)
+                               i.SetPrefetch(prefetch)
                                p.rchan <- i.Accept().(Receiver)
                        case *IncomingSender:
                                p.schan <- i.Accept().(Sender)
@@ -303,6 +305,7 @@ func (p *pairs) close() {
        closeClientServer(p.client, p.server)
 }
 
+// Return a client sender and server receiver
 func (p *pairs) senderReceiver() (Sender, Receiver) {
        snd, err := p.client.Sender()
        fatalIf(p.t, err)
@@ -310,6 +313,7 @@ func (p *pairs) senderReceiver() (Sender, Receiver) {
        return snd, rcv
 }
 
+// Return a client receiver and server sender
 func (p *pairs) receiverSender() (Receiver, Sender) {
        rcv, err := p.client.Receiver()
        fatalIf(p.t, err)
@@ -322,7 +326,7 @@ type result struct {
        err   error
 }
 
-func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) 
}
+func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) 
}
 
 func doSend(snd Sender, results chan result) {
        err := snd.SendSync(amqp.NewMessage()).Error
@@ -338,30 +342,48 @@ func doDisposition(ack <-chan Outcome, results chan 
result) {
        results <- result{"disposition", (<-ack).Error}
 }
 
+// Senders get credit immediately if receivers have prefetch set
+func TestSendReceivePrefetch(t *testing.T) {
+       pairs := newPairs(t, 1, true)
+       s, r := pairs.senderReceiver()
+       s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should 
not block for credit.
+       if _, err := r.Receive(); err != nil {
+               t.Error(err)
+       }
+}
+
+// Senders do not get credit till Receive() if receivers don't have prefetch
+func TestSendReceiveNoPrefetch(t *testing.T) {
+       pairs := newPairs(t, 1, false)
+       s, r := pairs.senderReceiver()
+       done := make(chan struct{}, 1)
+       go func() {
+               s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // 
Should block for credit.
+               close(done)
+       }()
+       select {
+       case <-done:
+               t.Errorf("send should be blocked on credit")
+       default:
+               if _, err := r.Receive(); err != nil {
+                       t.Error(err)
+               } else {
+                       <-done
+               } // Should be unblocked now
+       }
+}
+
 // Test that closing Links interrupts blocked link functions.
 func TestLinkCloseInterrupt(t *testing.T) {
-       want := amqp.Errorf("x", "all bad")
-       pairs := newPairs(t)
+       want := amqp.Error{Name: "x", Description: "all bad"}
+       pairs := newPairs(t, 1, false)
        results := make(chan result) // Collect expected errors
 
-       // Sender.Close() interrupts Send()
-       snd, rcv := pairs.senderReceiver()
-       go doSend(snd, results)
-       snd.Close(want)
-       if r := <-results; want != r.err {
-               t.Errorf("want %#v got %#v", want, r)
-       }
-
-       // Remote Receiver.Close() interrupts Send()
-       snd, rcv = pairs.senderReceiver()
-       go doSend(snd, results)
-       rcv.Close(want)
-       if r := <-results; want != r.err {
-               t.Errorf("want %#v got %#v", want, r)
-       }
+       // Note closing the link does not interrupt Send() calls, the AMQP spec 
says
+       // that deliveries can be settled after the link is closed.
 
        // Receiver.Close() interrupts Receive()
-       snd, rcv = pairs.senderReceiver()
+       snd, rcv := pairs.senderReceiver()
        go doReceive(rcv, results)
        rcv.Close(want)
        if r := <-results; want != r.err {
@@ -379,44 +401,50 @@ func TestLinkCloseInterrupt(t *testing.T) {
 
 // Test closing the server end of a connection.
 func TestConnectionCloseInterrupt1(t *testing.T) {
-       want := amqp.Errorf("x", "bad")
-       pairs := newPairs(t)
+       want := amqp.Error{Name: "x", Description: "bad"}
+       pairs := newPairs(t, 1, true)
        results := make(chan result) // Collect expected errors
 
        // Connection.Close() interrupts Send, Receive, Disposition.
        snd, rcv := pairs.senderReceiver()
-       go doReceive(rcv, results)
-       ack := snd.SendWaitable(amqp.NewMessage())
-       go doDisposition(ack, results)
-       snd, rcv = pairs.senderReceiver()
        go doSend(snd, results)
+
+       rcv.Receive()
        rcv, snd = pairs.receiverSender()
        go doReceive(rcv, results)
+
+       snd, rcv = pairs.senderReceiver()
+       ack := snd.SendWaitable(amqp.NewMessage())
+       rcv.Receive()
+       go doDisposition(ack, results)
+
        pairs.server.Close(want)
        for i := 0; i < 3; i++ {
                if r := <-results; want != r.err {
-                       // TODO aconway 2015-10-06: Not propagating the correct 
error, seeing nil and EOF.
-                       t.Logf("want %v got %v", want, r.err)
+                       t.Logf("want %v got %v", want, r)
                }
        }
 }
 
 // Test closing the client end of the connection.
 func TestConnectionCloseInterrupt2(t *testing.T) {
-       want := amqp.Errorf("x", "bad")
-       pairs := newPairs(t)
+       want := amqp.Error{Name: "x", Description: "bad"}
+       pairs := newPairs(t, 1, true)
        results := make(chan result) // Collect expected errors
 
        // Connection.Close() interrupts Send, Receive, Disposition.
        snd, rcv := pairs.senderReceiver()
-       go doReceive(rcv, results)
-       ack := snd.SendWaitable(amqp.NewMessage())
-       go doDisposition(ack, results)
-       snd, rcv = pairs.senderReceiver()
        go doSend(snd, results)
+       rcv.Receive()
+
        rcv, snd = pairs.receiverSender()
        go doReceive(rcv, results)
-       pairs.client.Close(want)
+
+       snd, rcv = pairs.senderReceiver()
+       ack := snd.SendWaitable(amqp.NewMessage())
+       go doDisposition(ack, results)
+
+       pairs.client.Connection().Close(want)
        for i := 0; i < 3; i++ {
                if r := <-results; want != r.err {
                        // TODO aconway 2015-10-06: Not propagating the correct 
error, seeing nil.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 22bdc7e..f2b7a52 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -29,7 +29,8 @@ import (
 // Receiver is a Link that receives messages.
 //
 type Receiver interface {
-       Link
+       Endpoint
+       LinkSettings
 
        // Receive blocks until a message is available or until the Receiver is 
closed
        // and has no more buffered messages.
@@ -64,108 +65,88 @@ type Receiver interface {
        Capacity() int
 }
 
-// Flow control policy for a receiver.
-type policy interface {
-       // Called at the start of Receive() to adjust credit before fetching a 
message.
-       Pre(*receiver)
-       // Called after Receive() has received a message from Buffer() before 
it returns.
-       // Non-nil error means no message was received because of an error.
-       Post(*receiver, error)
-}
-
-type prefetchPolicy struct{}
-
-func (p prefetchPolicy) Flow(r *receiver) {
-       r.engine().Inject(func() {
-               if r.Error() != nil {
-                       return
-               }
-               _, _, max := r.credit()
-               if max > 0 {
-                       r.eLink.Flow(max)
-               }
-       })
-}
-func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) }
-func (p prefetchPolicy) Post(r *receiver, err error) {
-       if err == nil {
-               p.Flow(r)
-       }
-}
-
-type noPrefetchPolicy struct{ waiting int }
-
-func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine
-       r.engine().Inject(func() {
-               if r.Error() != nil {
-                       return
-               }
-               len, credit, max := r.credit()
-               add := p.waiting - (len + credit)
-               if add > max {
-                       add = max // Don't overflow
-               }
-               if add > 0 {
-                       r.eLink.Flow(add)
-               }
-       })
-}
-func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) }
-func (p noPrefetchPolicy) Post(r *receiver, err error) {
-       p.waiting--
-       if err == nil {
-               p.Flow(r)
-       }
-}
-
 // Receiver implementation
 type receiver struct {
        link
-       buffer chan ReceivedMessage
-       policy policy
+       buffer  chan ReceivedMessage
+       callers int
 }
 
+func (r *receiver) Capacity() int  { return cap(r.buffer) }
+func (r *receiver) Prefetch() bool { return r.prefetch }
+
 // Call in proton goroutine
-func newReceiver(l link) *receiver {
-       r := &receiver{link: l}
+func newReceiver(ls linkSettings) *receiver {
+       r := &receiver{link: link{linkSettings: ls}}
+       r.endpoint.init(r.link.eLink.String())
        if r.capacity < 1 {
                r.capacity = 1
        }
-       if r.prefetch {
-               r.policy = &prefetchPolicy{}
-       } else {
-               r.policy = &noPrefetchPolicy{}
-       }
        r.buffer = make(chan ReceivedMessage, r.capacity)
        r.handler().addLink(r.eLink, r)
-       r.link.open()
+       r.link.eLink.Open()
+       if r.prefetch {
+               r.flow(r.maxFlow())
+       }
        return r
 }
 
-// call in proton goroutine.
-func (r *receiver) credit() (buffered, credit, max int) {
-       return len(r.buffer), r.eLink.Credit(), cap(r.buffer) - len(r.buffer)
+// Call in proton gorotine. Max additional credit we can request.
+func (r *receiver) maxFlow() int { return cap(r.buffer) - len(r.buffer) - 
r.eLink.Credit() }
+
+func (r *receiver) flow(credit int) {
+       if credit > 0 {
+               r.eLink.Flow(credit)
+       }
 }
 
-func (r *receiver) Capacity() int  { return cap(r.buffer) }
-func (r *receiver) Prefetch() bool { return r.prefetch }
+// Inject flow check per-caller call when prefetch is off.
+// Called with inc=1 at start of call, inc = -1 at end
+func (r *receiver) caller(inc int) {
+       r.engine().Inject(func() {
+               r.callers += inc
+               need := r.callers - (len(r.buffer) + r.eLink.Credit())
+               max := r.maxFlow()
+               if need > max {
+                       need = max
+               }
+               r.flow(need)
+       })
+}
 
+// Inject flow top-up if prefetch is enabled
+func (r *receiver) flowTopUp() {
+       if r.prefetch {
+               r.engine().Inject(func() { r.flow(r.maxFlow()) })
+       }
+}
+
+// Not claled
 func (r *receiver) Receive() (rm ReceivedMessage, err error) {
        return r.ReceiveTimeout(Forever)
 }
 
-func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, 
err error) {
+func (r *receiver) ReceiveTimeout(timeout time.Duration) (ReceivedMessage, 
error) {
        assert(r.buffer != nil, "Receiver is not open: %s", r)
-       r.policy.Pre(r)
-       defer func() { r.policy.Post(r, err) }()
+       select { // Check for immediate availability
+       case rm := <-r.buffer:
+               r.flowTopUp()
+               return rm, nil
+       default:
+       }
+       if !r.prefetch { // Per-caller flow control
+               r.caller(+1)
+               defer r.caller(-1)
+       }
        rmi, err := timedReceive(r.buffer, timeout)
        switch err {
-       case Timeout:
-               return ReceivedMessage{}, Timeout
+       case nil:
+               r.flowTopUp()
+               return rmi.(ReceivedMessage), err
        case Closed:
                return ReceivedMessage{}, r.Error()
        default:
-               return rmi.(ReceivedMessage), nil
+               return ReceivedMessage{}, err
        }
 }
 
@@ -192,11 +173,11 @@ func (r *receiver) message(delivery proton.Delivery) {
        }
 }
 
-func (r *receiver) closed(err error) {
-       r.link.closed(err)
+func (r *receiver) closed(err error) error {
        if r.buffer != nil {
                close(r.buffer)
        }
+       return r.link.closed(err)
 }
 
 // ReceivedMessage contains an amqp.Message and allows the message to be 
acknowledged.
@@ -240,5 +221,5 @@ func (in *IncomingReceiver) SetPrefetch(prefetch bool) { 
in.prefetch = prefetch
 
 // Accept accepts an incoming receiver endpoint
 func (in *IncomingReceiver) Accept() Endpoint {
-       return in.accept(func() Endpoint { return newReceiver(in.link) })
+       return in.accept(func() Endpoint { return newReceiver(in.linkSettings) 
})
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 834eb75..2f0e965 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -38,7 +38,8 @@ import (
 // Send*Timeout methods will give up after the timeout and set Timeout as 
Outcome.Error.
 //
 type Sender interface {
-       Link
+       Endpoint
+       LinkSettings
 
        // SendSync sends a message and blocks until the message is 
acknowledged by the remote receiver.
        // Returns an Outcome, which may contain an error if the message could 
not be sent.
@@ -83,6 +84,12 @@ type Outcome struct {
        Value interface{}
 }
 
+func (o Outcome) send(ack chan<- Outcome) {
+       if ack != nil {
+               ack <- o
+       }
+}
+
 // SentStatus indicates the status of a sent message.
 type SentStatus int
 
@@ -144,44 +151,37 @@ type sender struct {
 func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v 
interface{}, t time.Duration) {
        // wait for credit
        if _, err := timedReceive(s.credit, t); err != nil {
-               if err == Closed && s.Error != nil {
+               if err == Closed && s.Error() != nil {
                        err = s.Error()
                }
-               ack <- Outcome{Unsent, err, v}
+               Outcome{Unsent, err, v}.send(ack)
                return
        }
        // Send a message in handler goroutine
        err := s.engine().Inject(func() {
                if s.Error() != nil {
-                       if ack != nil {
-                               ack <- Outcome{Unsent, s.Error(), v}
-                       }
+                       Outcome{Unsent, s.Error(), v}.send(ack)
                        return
                }
-               if delivery, err := s.eLink.Send(m); err == nil {
-                       if ack != nil { // We must report an outcome
-                               if s.SndSettle() == SndSettled {
-                                       delivery.Settle() // Pre-settle if 
required
-                                       ack <- Outcome{Accepted, nil, v}
-                               } else {
-                                       s.handler().sentMessages[delivery] = 
sentMessage{ack, v}
-                               }
-                       } else { // ack == nil, can't report outcome
-                               if s.SndSettle() != SndUnsettled { // 
Pre-settle unless we are forced not to.
-                                       delivery.Settle()
-                               }
-                       }
-               } else { // err != nil
-                       if ack != nil {
-                               ack <- Outcome{Unsent, err, v}
+
+               delivery, err2 := s.eLink.Send(m)
+               switch {
+               case err2 != nil:
+                       Outcome{Unsent, err2, v}.send(ack)
+               case ack == nil || s.SndSettle() == SndSettled: // Pre-settled
+                       if s.SndSettle() != SndUnsettled { // Not forced to 
send unsettled by link policy
+                               delivery.Settle()
                        }
+                       Outcome{Accepted, nil, v}.send(ack) // Assume accepted
+               default:
+                       s.handler().sentMessages[delivery] = sentMessage{ack, 
v} // Register with handler
                }
                if s.eLink.Credit() > 0 { // Signal there is still credit
                        s.sendable()
                }
        })
-       if err != nil && ack != nil {
-               ack <- Outcome{Unsent, err, v}
+       if err != nil {
+               Outcome{Unsent, err, v}.send(ack)
        }
 }
 
@@ -237,15 +237,16 @@ func (s *sender) SendSync(m amqp.Message) Outcome {
 }
 
 // handler goroutine
-func (s *sender) closed(err error) {
-       s.link.closed(err)
+func (s *sender) closed(err error) error {
        close(s.credit)
+       return s.link.closed(err)
 }
 
-func newSender(l link) *sender {
-       s := &sender{link: l, credit: make(chan struct{}, 1)}
+func newSender(ls linkSettings) *sender {
+       s := &sender{link: link{linkSettings: ls}, credit: make(chan struct{}, 
1)}
+       s.endpoint.init(s.link.eLink.String())
        s.handler().addLink(s.eLink, s)
-       s.link.open()
+       s.link.eLink.Open()
        return s
 }
 
@@ -263,7 +264,7 @@ type IncomingSender struct {
 
 // Accept accepts an incoming sender endpoint
 func (in *IncomingSender) Accept() Endpoint {
-       return in.accept(func() Endpoint { return newSender(in.link) })
+       return in.accept(func() Endpoint { return newSender(in.linkSettings) })
 }
 
 // Call in injected functions to check if the sender is valid.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
index 18d8bc8..1bbc52c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -53,8 +53,8 @@ func newSession(c *connection, es proton.Session, setting 
...SessionOption) *ses
        s := &session{
                connection: c,
                eSession:   es,
-               endpoint:   makeEndpoint(es.String()),
        }
+       s.endpoint.init(es.String())
        for _, set := range setting {
                set(s)
        }
@@ -81,7 +81,7 @@ func (s *session) Sender(setting ...LinkOption) (snd Sender, 
err error) {
                if s.Error() != nil {
                        return s.Error()
                }
-               l, err := localLink(s, true, setting...)
+               l, err := makeLocalLink(s, true, setting...)
                if err == nil {
                        snd = newSender(l)
                }
@@ -95,7 +95,7 @@ func (s *session) Receiver(setting ...LinkOption) (rcv 
Receiver, err error) {
                if s.Error() != nil {
                        return s.Error()
                }
-               l, err := localLink(s, false, setting...)
+               l, err := makeLocalLink(s, false, setting...)
                if err == nil {
                        rcv = newReceiver(l)
                }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 13d44b8..eecda7a 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -33,9 +33,9 @@ import "C"
 
 import (
        "fmt"
-       "io"
        "net"
        "sync"
+       "time"
        "unsafe"
 )
 
@@ -172,7 +172,7 @@ func (eng *Engine) String() string {
 }
 
 func (eng *Engine) Id() string {
-       return fmt.Sprintf("%eng", &eng)
+       return fmt.Sprintf("%p", eng)
 }
 
 func (eng *Engine) Error() error {
@@ -223,19 +223,35 @@ func (eng *Engine) InjectWait(f func() error) error {
 //
 func (eng *Engine) Server() { eng.transport.SetServer() }
 
-// Close the engine's connection, returns when the engine has exited.
+func (eng *Engine) disconnect() {
+       eng.transport.CloseHead()
+       eng.transport.CloseTail()
+       eng.conn.Close()
+       eng.dispatch()
+}
+
+// Close the engine's connection.
+// If err != nil pass it to the remote end as the close condition.
+// Returns when the remote end closes or disconnects.
 func (eng *Engine) Close(err error) {
-       eng.err.Set(err)
-       eng.Inject(func() {
-               CloseError(eng.connection, err)
-       })
+       eng.Inject(func() { CloseError(eng.connection, err) })
        <-eng.running
 }
 
-// Disconnect the engine's connection without and AMQP close, returns when the 
engine has exited.
+// CloseTimeout like Close but disconnect if the remote end doesn't close 
within timeout.
+func (eng *Engine) CloseTimeout(err error, timeout time.Duration) {
+       eng.Inject(func() { CloseError(eng.connection, err) })
+       select {
+       case <-eng.running:
+       case <-time.After(timeout):
+               eng.Disconnect(err)
+       }
+}
+
+// Disconnect the engine's connection immediately without an AMQP close.
+// Process any termination events before returning.
 func (eng *Engine) Disconnect(err error) {
-       eng.err.Set(err)
-       eng.conn.Close()
+       eng.Inject(func() { eng.transport.Condition().SetError(err); 
eng.disconnect() })
        <-eng.running
 }
 
@@ -283,7 +299,7 @@ func (eng *Engine) Run() error {
 
        wbuf := eng.write.buffer()[:0]
 
-       for eng.err.Get() == nil {
+       for !eng.transport.Closed() {
                if len(wbuf) == 0 {
                        eng.pop(&wbuf)
                }
@@ -305,12 +321,19 @@ func (eng *Engine) Run() error {
                                f()
                        }
                case err := <-readErr:
-                       eng.netError(err)
+                       eng.transport.Condition().SetError(err)
+                       eng.transport.CloseTail()
                case err := <-writeErr:
-                       eng.netError(err)
+                       eng.transport.Condition().SetError(err)
+                       eng.transport.CloseHead()
+               }
+               eng.dispatch()
+               if eng.connection.State().RemoteClosed() && 
eng.connection.State().LocalClosed() {
+                       eng.disconnect()
                }
-               eng.process()
        }
+       eng.err.Set(EndpointError(eng.connection))
+       eng.err.Set(eng.transport.Condition().Error())
        close(eng.write.buffers)
        eng.conn.Close() // Make sure connection is closed
        wait.Wait()
@@ -334,12 +357,6 @@ func (eng *Engine) Run() error {
        return eng.err.Get()
 }
 
-func (eng *Engine) netError(err error) {
-       eng.err.Set(err)
-       eng.transport.CloseHead()
-       eng.transport.CloseTail()
-}
-
 func minInt(a, b int) int {
        if a < b {
                return a
@@ -378,18 +395,13 @@ func (eng *Engine) push(buf []byte) {
        }
 }
 
-func (eng *Engine) handle(e Event) {
-       for _, h := range eng.handlers {
-               h.HandleEvent(e)
-       }
-       if e.Type() == ETransportClosed {
-               eng.err.Set(io.EOF)
-       }
-}
+func (eng *Engine) peek() *C.pn_event_t { return 
C.pn_collector_peek(eng.collector) }
 
-func (eng *Engine) process() {
-       for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = 
C.pn_collector_peek(eng.collector) {
-               eng.handle(makeEvent(ce, eng))
+func (eng *Engine) dispatch() {
+       for ce := eng.peek(); ce != nil; ce = eng.peek() {
+               for _, h := range eng.handlers {
+                       h.HandleEvent(makeEvent(ce, eng))
+               }
                C.pn_collector_pop(eng.collector)
        }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index cd547ed..70611d3 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -334,7 +334,7 @@ func (c Condition) Error() error {
        if c.IsNil() || !c.IsSet() {
                return nil
        }
-       return amqp.Error{c.Name(), c.Description()}
+       return amqp.Error{Name: c.Name(), Description: c.Description()}
 }
 
 // Set a Go error into a condition.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to