PROTON-1910: [go] move message encode/decode to handler thread

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4a9f3b98
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4a9f3b98
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4a9f3b98

Branch: refs/heads/master
Commit: 4a9f3b986693d61040c1db32f46355d04ce75499
Parents: 886d2b9
Author: Alan Conway <[email protected]>
Authored: Thu Oct 11 15:23:00 2018 -0400
Committer: Alan Conway <[email protected]>
Committed: Thu Oct 11 18:01:44 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/amqp/message.go        |   3 -
 go/src/qpid.apache.org/electron/connection.go |  10 +-
 go/src/qpid.apache.org/electron/handler.go    |  32 +++---
 go/src/qpid.apache.org/electron/link.go       |   2 +-
 go/src/qpid.apache.org/electron/receiver.go   |   7 +-
 go/src/qpid.apache.org/electron/sender.go     | 128 ++++++++++++---------
 6 files changed, 106 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message.go 
b/go/src/qpid.apache.org/amqp/message.go
index 919904c..e96b331 100644
--- a/go/src/qpid.apache.org/amqp/message.go
+++ b/go/src/qpid.apache.org/amqp/message.go
@@ -325,9 +325,6 @@ func (m *message) SetApplicationProperties(x 
map[string]interface{}) {
 func (m *message) Marshal(v interface{}) { m.body = v }
 
 func (m *message) Unmarshal(v interface{}) {
-       // FIXME aconway 2018-09-28: this is inefficient, replace with a
-       // reflective conversion from the existing body value that respects
-       // the Unmarshal() rules.
        pnData := C.pn_data(2)
        marshal(m.body, pnData)
        unmarshal(v, pnData)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/connection.go 
b/go/src/qpid.apache.org/electron/connection.go
index 48cba60..464c4df 100644
--- a/go/src/qpid.apache.org/electron/connection.go
+++ b/go/src/qpid.apache.org/electron/connection.go
@@ -30,6 +30,7 @@ import (
        "sync"
        "time"
 
+       "qpid.apache.org/amqp"
        "qpid.apache.org/proton"
 )
 
@@ -183,6 +184,7 @@ type connection struct {
        handler        *handler
        engine         *proton.Engine
        pConnection    proton.Connection
+       mc             amqp.MessageCodec
 
        defaultSession Session
 }
@@ -244,8 +246,12 @@ func (c *connection) run() {
 }
 
 func (c *connection) Close(err error) {
-       c.err.Set(err)
-       c.engine.Close(err)
+       c.closeOnce.Do(func() {
+               c.err.Set(err)
+               c.engine.Close(err)
+               c.mc.Close()
+       })
+
 }
 
 func (c *connection) Disconnect(err error) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/handler.go 
b/go/src/qpid.apache.org/electron/handler.go
index 899d6a9..753f9d3 100644
--- a/go/src/qpid.apache.org/electron/handler.go
+++ b/go/src/qpid.apache.org/electron/handler.go
@@ -27,19 +27,19 @@ import (
 // NOTE: methods in this file are called only in the proton goroutine unless 
otherwise indicated.
 
 type handler struct {
-       delegator    *proton.MessagingAdapter
-       connection   *connection
-       links        map[proton.Link]Endpoint
-       sentMessages map[proton.Delivery]sentMessage
-       sessions     map[proton.Session]*session
+       delegator  *proton.MessagingAdapter
+       connection *connection
+       links      map[proton.Link]Endpoint
+       sent       map[proton.Delivery]*sendable // Waiting for outcome
+       sessions   map[proton.Session]*session
 }
 
 func newHandler(c *connection) *handler {
        h := &handler{
-               connection:   c,
-               links:        make(map[proton.Link]Endpoint),
-               sentMessages: make(map[proton.Delivery]sentMessage),
-               sessions:     make(map[proton.Session]*session),
+               connection: c,
+               links:      make(map[proton.Link]Endpoint),
+               sent:       make(map[proton.Delivery]*sendable),
+               sessions:   make(map[proton.Session]*session),
        }
        h.delegator = proton.NewMessagingAdapter(h)
        // Disable auto features of MessagingAdapter, we do these ourselves.
@@ -65,15 +65,15 @@ func (h *handler) HandleMessagingEvent(t 
proton.MessagingEvent, e proton.Event)
                }
 
        case proton.MSettled:
-               if sm, ok := h.sentMessages[e.Delivery()]; ok {
+               if sm, ok := h.sent[e.Delivery()]; ok {
                        d := e.Delivery().Remote()
-                       sm.ack <- Outcome{sentStatus(d.Type()), 
d.Condition().Error(), sm.value}
-                       delete(h.sentMessages, e.Delivery())
+                       sm.ack <- Outcome{sentStatus(d.Type()), 
d.Condition().Error(), sm.v}
+                       delete(h.sent, e.Delivery())
                }
 
        case proton.MSendable:
                if s, ok := h.links[e.Link()].(*sender); ok {
-                       s.sendable()
+                       s.trySend()
                } else {
                        h.linkError(e.Link(), "no sender")
                }
@@ -182,10 +182,10 @@ func (h *handler) sessionClosed(ps proton.Session, err 
error) {
 
 func (h *handler) shutdown(err error) {
        err = h.connection.closed(err)
-       for _, sm := range h.sentMessages {
+       for _, sm := range h.sent {
                // Don't block but ensure outcome is sent eventually.
                if sm.ack != nil {
-                       o := Outcome{Unacknowledged, err, sm.value}
+                       o := Outcome{Unacknowledged, err, sm.v}
                        select {
                        case sm.ack <- o:
                        default:
@@ -193,7 +193,7 @@ func (h *handler) shutdown(err error) {
                        }
                }
        }
-       h.sentMessages = nil
+       h.sent = nil
        for _, l := range h.links {
                _ = l.closed(err)
        }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/link.go 
b/go/src/qpid.apache.org/electron/link.go
index de8a995..dd974f5 100644
--- a/go/src/qpid.apache.org/electron/link.go
+++ b/go/src/qpid.apache.org/electron/link.go
@@ -146,7 +146,7 @@ const (
        // Messages are sent already settled
        SndSettled = SndSettleMode(proton.SndSettled)
        // Sender can send either unsettled or settled messages.
-       SendMixed = SndSettleMode(proton.SndMixed)
+       SndMixed = SndSettleMode(proton.SndMixed)
 )
 
 // RcvSettleMode defines when the receiving end of the link settles message 
delivery.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/receiver.go 
b/go/src/qpid.apache.org/electron/receiver.go
index 8e3b1a7..eba300b 100644
--- a/go/src/qpid.apache.org/electron/receiver.go
+++ b/go/src/qpid.apache.org/electron/receiver.go
@@ -162,7 +162,12 @@ func (r *receiver) message(delivery proton.Delivery) {
                return
        }
        if delivery.HasMessage() {
-               m, err := delivery.Message()
+               bytes, err := delivery.MessageBytes()
+               var m amqp.Message
+               if err == nil {
+                       m = amqp.NewMessage()
+                       err = r.session.connection.mc.Decode(m, bytes)
+               }
                if err != nil {
                        localClose(r.pLink, err)
                        return

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/sender.go 
b/go/src/qpid.apache.org/electron/sender.go
index f46fdc4..98e6c9b 100644
--- a/go/src/qpid.apache.org/electron/sender.go
+++ b/go/src/qpid.apache.org/electron/sender.go
@@ -24,9 +24,10 @@ import "C"
 
 import (
        "fmt"
+       "time"
+
        "qpid.apache.org/amqp"
        "qpid.apache.org/proton"
-       "time"
 )
 
 // Sender is a Link that sends messages.
@@ -148,54 +149,86 @@ func sentStatus(d uint64) SentStatus {
        }
 }
 
-// Sender implementation, held by handler.
+type sendable struct {
+       m    amqp.Message
+       ack  chan<- Outcome // Channel for acknowledgement of m
+       v    interface{}    // Correlation value
+       sent chan struct{}  // Closed when m is encoded and will be sent
+}
+
+func (sm *sendable) unsent(err error) {
+       Outcome{Unsent, err, sm.v}.send(sm.ack)
+}
+
 type sender struct {
        link
-       credit chan struct{} // Signal available credit.
+       sending []*sendable
 }
 
-func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v 
interface{}, t time.Duration) {
-       // wait for credit
-       if _, err := timedReceive(s.credit, t); err != nil {
-               if err == Closed && s.Error() != nil {
-                       err = s.Error()
-               }
-               Outcome{Unsent, err, v}.send(ack)
+func newSender(ls linkSettings) *sender {
+       s := &sender{link: link{linkSettings: ls}}
+       s.endpoint.init(s.link.pLink.String())
+       s.handler().addLink(s.pLink, s)
+       s.link.pLink.Open()
+       return s
+}
+
+// Called in handler goroutine
+func (s *sender) startSend(sm *sendable) {
+       s.sending = append(s.sending, sm)
+       s.trySend()
+}
+
+// Called in handler goroutine
+func (s *sender) trySend() {
+       for s.pLink.Credit() > 0 && len(s.sending) > 0 {
+               sm := s.sending[0]
+               s.sending = s.sending[1:]
+               s.send(sm)
+       }
+}
+
+// Called in handler goroutine with credit > 0
+func (s *sender) send(sm *sendable) {
+       var err error
+       bytes, err := s.session.connection.mc.Encode(sm.m, nil)
+       close(sm.sent) // Safe to re-use sm.m now
+       if err != nil {
+               sm.unsent(err)
                return
        }
-       // Send a message in handler goroutine
-       err := s.engine().Inject(func() {
-               if s.Error() != nil {
-                       Outcome{Unsent, s.Error(), v}.send(ack)
-                       return
-               }
+       d, err := s.pLink.SendMessageBytes(bytes)
+       if err != nil {
+               sm.unsent(err)
+               return
+       }
+       if s.SndSettle() == SndSettled || (s.SndSettle() == SndMixed && sm.ack 
== nil) {
+               d.Settle()                                // Pre-settled
+               Outcome{Accepted, nil, sm.v}.send(sm.ack) // Assume accepted
+       } else {
+               // Register with handler to receive the remote outcome
+               s.handler().sent[d] = sm
+       }
+}
 
-               delivery, err2 := s.pLink.Send(m)
-               switch {
-               case err2 != nil:
-                       Outcome{Unsent, err2, v}.send(ack)
-               case ack == nil || s.SndSettle() == SndSettled: // Pre-settled
-                       if s.SndSettle() != SndUnsettled { // Not forced to 
send unsettled by link policy
-                               delivery.Settle()
-                       }
-                       Outcome{Accepted, nil, v}.send(ack) // Assume accepted
-               default:
-                       s.handler().sentMessages[delivery] = sentMessage{ack, 
v} // Register with handler
-               }
-               if s.pLink.Credit() > 0 { // Signal there is still credit
-                       s.sendable()
+func (s *sender) timeoutSend(sm *sendable) {
+       for i, sm2 := range s.sending {
+               if sm2 == sm {
+                       n := copy(s.sending[i:], s.sending[i+1:])
+                       s.sending = s.sending[:i+n] // delete
+                       close(sm.sent)
+                       return
                }
-       })
-       if err != nil {
-               Outcome{Unsent, err, v}.send(ack)
        }
 }
 
-// Set credit flag if not already set. Non-blocking, any goroutine
-func (s *sender) sendable() {
-       select { // Non-blocking
-       case s.credit <- struct{}{}:
-       default:
+func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v 
interface{}, t time.Duration) {
+       sm := &sendable{m, ack, v, make(chan struct{})}
+       s.engine().Inject(func() { s.startSend(sm) })
+       select {
+       case <-sm.sent: // OK
+       case <-After(t): // Try to timeout sm
+               s.engine().Inject(func() { s.timeoutSend(sm) })
        }
 }
 
@@ -244,24 +277,13 @@ func (s *sender) SendSync(m amqp.Message) Outcome {
 
 // handler goroutine
 func (s *sender) closed(err error) error {
-       close(s.credit)
+       for _, sm := range s.sending {
+               close(sm.sent)
+       }
+       s.sending = nil
        return s.link.closed(err)
 }
 
-func newSender(ls linkSettings) *sender {
-       s := &sender{link: link{linkSettings: ls}, credit: make(chan struct{}, 
1)}
-       s.endpoint.init(s.link.pLink.String())
-       s.handler().addLink(s.pLink, s)
-       s.link.pLink.Open()
-       return s
-}
-
-// sentMessage records a sent message on the handler.
-type sentMessage struct {
-       ack   chan<- Outcome
-       value interface{}
-}
-
 // IncomingSender is sent on the Connection.Incoming() channel when there is
 // an incoming request to open a sender link.
 type IncomingSender struct {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to