PROTON-1910: [go] move message encode/decode to handler thread
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4a9f3b98 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4a9f3b98 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4a9f3b98 Branch: refs/heads/master Commit: 4a9f3b986693d61040c1db32f46355d04ce75499 Parents: 886d2b9 Author: Alan Conway <[email protected]> Authored: Thu Oct 11 15:23:00 2018 -0400 Committer: Alan Conway <[email protected]> Committed: Thu Oct 11 18:01:44 2018 -0400 ---------------------------------------------------------------------- go/src/qpid.apache.org/amqp/message.go | 3 - go/src/qpid.apache.org/electron/connection.go | 10 +- go/src/qpid.apache.org/electron/handler.go | 32 +++--- go/src/qpid.apache.org/electron/link.go | 2 +- go/src/qpid.apache.org/electron/receiver.go | 7 +- go/src/qpid.apache.org/electron/sender.go | 128 ++++++++++++--------- 6 files changed, 106 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/amqp/message.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/amqp/message.go b/go/src/qpid.apache.org/amqp/message.go index 919904c..e96b331 100644 --- a/go/src/qpid.apache.org/amqp/message.go +++ b/go/src/qpid.apache.org/amqp/message.go @@ -325,9 +325,6 @@ func (m *message) SetApplicationProperties(x map[string]interface{}) { func (m *message) Marshal(v interface{}) { m.body = v } func (m *message) Unmarshal(v interface{}) { - // FIXME aconway 2018-09-28: this is inefficient, replace with a - // reflective conversion from the existing body value that respects - // the Unmarshal() rules. pnData := C.pn_data(2) marshal(m.body, pnData) unmarshal(v, pnData) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/connection.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/connection.go b/go/src/qpid.apache.org/electron/connection.go index 48cba60..464c4df 100644 --- a/go/src/qpid.apache.org/electron/connection.go +++ b/go/src/qpid.apache.org/electron/connection.go @@ -30,6 +30,7 @@ import ( "sync" "time" + "qpid.apache.org/amqp" "qpid.apache.org/proton" ) @@ -183,6 +184,7 @@ type connection struct { handler *handler engine *proton.Engine pConnection proton.Connection + mc amqp.MessageCodec defaultSession Session } @@ -244,8 +246,12 @@ func (c *connection) run() { } func (c *connection) Close(err error) { - c.err.Set(err) - c.engine.Close(err) + c.closeOnce.Do(func() { + c.err.Set(err) + c.engine.Close(err) + c.mc.Close() + }) + } func (c *connection) Disconnect(err error) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/handler.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/handler.go b/go/src/qpid.apache.org/electron/handler.go index 899d6a9..753f9d3 100644 --- a/go/src/qpid.apache.org/electron/handler.go +++ b/go/src/qpid.apache.org/electron/handler.go @@ -27,19 +27,19 @@ import ( // NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated. type handler struct { - delegator *proton.MessagingAdapter - connection *connection - links map[proton.Link]Endpoint - sentMessages map[proton.Delivery]sentMessage - sessions map[proton.Session]*session + delegator *proton.MessagingAdapter + connection *connection + links map[proton.Link]Endpoint + sent map[proton.Delivery]*sendable // Waiting for outcome + sessions map[proton.Session]*session } func newHandler(c *connection) *handler { h := &handler{ - connection: c, - links: make(map[proton.Link]Endpoint), - sentMessages: make(map[proton.Delivery]sentMessage), - sessions: make(map[proton.Session]*session), + connection: c, + links: make(map[proton.Link]Endpoint), + sent: make(map[proton.Delivery]*sendable), + sessions: make(map[proton.Session]*session), } h.delegator = proton.NewMessagingAdapter(h) // Disable auto features of MessagingAdapter, we do these ourselves. @@ -65,15 +65,15 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) } case proton.MSettled: - if sm, ok := h.sentMessages[e.Delivery()]; ok { + if sm, ok := h.sent[e.Delivery()]; ok { d := e.Delivery().Remote() - sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value} - delete(h.sentMessages, e.Delivery()) + sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.v} + delete(h.sent, e.Delivery()) } case proton.MSendable: if s, ok := h.links[e.Link()].(*sender); ok { - s.sendable() + s.trySend() } else { h.linkError(e.Link(), "no sender") } @@ -182,10 +182,10 @@ func (h *handler) sessionClosed(ps proton.Session, err error) { func (h *handler) shutdown(err error) { err = h.connection.closed(err) - for _, sm := range h.sentMessages { + for _, sm := range h.sent { // Don't block but ensure outcome is sent eventually. if sm.ack != nil { - o := Outcome{Unacknowledged, err, sm.value} + o := Outcome{Unacknowledged, err, sm.v} select { case sm.ack <- o: default: @@ -193,7 +193,7 @@ func (h *handler) shutdown(err error) { } } } - h.sentMessages = nil + h.sent = nil for _, l := range h.links { _ = l.closed(err) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/link.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/link.go b/go/src/qpid.apache.org/electron/link.go index de8a995..dd974f5 100644 --- a/go/src/qpid.apache.org/electron/link.go +++ b/go/src/qpid.apache.org/electron/link.go @@ -146,7 +146,7 @@ const ( // Messages are sent already settled SndSettled = SndSettleMode(proton.SndSettled) // Sender can send either unsettled or settled messages. - SendMixed = SndSettleMode(proton.SndMixed) + SndMixed = SndSettleMode(proton.SndMixed) ) // RcvSettleMode defines when the receiving end of the link settles message delivery. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/receiver.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/receiver.go b/go/src/qpid.apache.org/electron/receiver.go index 8e3b1a7..eba300b 100644 --- a/go/src/qpid.apache.org/electron/receiver.go +++ b/go/src/qpid.apache.org/electron/receiver.go @@ -162,7 +162,12 @@ func (r *receiver) message(delivery proton.Delivery) { return } if delivery.HasMessage() { - m, err := delivery.Message() + bytes, err := delivery.MessageBytes() + var m amqp.Message + if err == nil { + m = amqp.NewMessage() + err = r.session.connection.mc.Decode(m, bytes) + } if err != nil { localClose(r.pLink, err) return http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/sender.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/sender.go b/go/src/qpid.apache.org/electron/sender.go index f46fdc4..98e6c9b 100644 --- a/go/src/qpid.apache.org/electron/sender.go +++ b/go/src/qpid.apache.org/electron/sender.go @@ -24,9 +24,10 @@ import "C" import ( "fmt" + "time" + "qpid.apache.org/amqp" "qpid.apache.org/proton" - "time" ) // Sender is a Link that sends messages. @@ -148,54 +149,86 @@ func sentStatus(d uint64) SentStatus { } } -// Sender implementation, held by handler. +type sendable struct { + m amqp.Message + ack chan<- Outcome // Channel for acknowledgement of m + v interface{} // Correlation value + sent chan struct{} // Closed when m is encoded and will be sent +} + +func (sm *sendable) unsent(err error) { + Outcome{Unsent, err, sm.v}.send(sm.ack) +} + type sender struct { link - credit chan struct{} // Signal available credit. + sending []*sendable } -func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) { - // wait for credit - if _, err := timedReceive(s.credit, t); err != nil { - if err == Closed && s.Error() != nil { - err = s.Error() - } - Outcome{Unsent, err, v}.send(ack) +func newSender(ls linkSettings) *sender { + s := &sender{link: link{linkSettings: ls}} + s.endpoint.init(s.link.pLink.String()) + s.handler().addLink(s.pLink, s) + s.link.pLink.Open() + return s +} + +// Called in handler goroutine +func (s *sender) startSend(sm *sendable) { + s.sending = append(s.sending, sm) + s.trySend() +} + +// Called in handler goroutine +func (s *sender) trySend() { + for s.pLink.Credit() > 0 && len(s.sending) > 0 { + sm := s.sending[0] + s.sending = s.sending[1:] + s.send(sm) + } +} + +// Called in handler goroutine with credit > 0 +func (s *sender) send(sm *sendable) { + var err error + bytes, err := s.session.connection.mc.Encode(sm.m, nil) + close(sm.sent) // Safe to re-use sm.m now + if err != nil { + sm.unsent(err) return } - // Send a message in handler goroutine - err := s.engine().Inject(func() { - if s.Error() != nil { - Outcome{Unsent, s.Error(), v}.send(ack) - return - } + d, err := s.pLink.SendMessageBytes(bytes) + if err != nil { + sm.unsent(err) + return + } + if s.SndSettle() == SndSettled || (s.SndSettle() == SndMixed && sm.ack == nil) { + d.Settle() // Pre-settled + Outcome{Accepted, nil, sm.v}.send(sm.ack) // Assume accepted + } else { + // Register with handler to receive the remote outcome + s.handler().sent[d] = sm + } +} - delivery, err2 := s.pLink.Send(m) - switch { - case err2 != nil: - Outcome{Unsent, err2, v}.send(ack) - case ack == nil || s.SndSettle() == SndSettled: // Pre-settled - if s.SndSettle() != SndUnsettled { // Not forced to send unsettled by link policy - delivery.Settle() - } - Outcome{Accepted, nil, v}.send(ack) // Assume accepted - default: - s.handler().sentMessages[delivery] = sentMessage{ack, v} // Register with handler - } - if s.pLink.Credit() > 0 { // Signal there is still credit - s.sendable() +func (s *sender) timeoutSend(sm *sendable) { + for i, sm2 := range s.sending { + if sm2 == sm { + n := copy(s.sending[i:], s.sending[i+1:]) + s.sending = s.sending[:i+n] // delete + close(sm.sent) + return } - }) - if err != nil { - Outcome{Unsent, err, v}.send(ack) } } -// Set credit flag if not already set. Non-blocking, any goroutine -func (s *sender) sendable() { - select { // Non-blocking - case s.credit <- struct{}{}: - default: +func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) { + sm := &sendable{m, ack, v, make(chan struct{})} + s.engine().Inject(func() { s.startSend(sm) }) + select { + case <-sm.sent: // OK + case <-After(t): // Try to timeout sm + s.engine().Inject(func() { s.timeoutSend(sm) }) } } @@ -244,24 +277,13 @@ func (s *sender) SendSync(m amqp.Message) Outcome { // handler goroutine func (s *sender) closed(err error) error { - close(s.credit) + for _, sm := range s.sending { + close(sm.sent) + } + s.sending = nil return s.link.closed(err) } -func newSender(ls linkSettings) *sender { - s := &sender{link: link{linkSettings: ls}, credit: make(chan struct{}, 1)} - s.endpoint.init(s.link.pLink.String()) - s.handler().addLink(s.pLink, s) - s.link.pLink.Open() - return s -} - -// sentMessage records a sent message on the handler. -type sentMessage struct { - ack chan<- Outcome - value interface{} -} - // IncomingSender is sent on the Connection.Incoming() channel when there is // an incoming request to open a sender link. type IncomingSender struct { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
