http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/link.go ---------------------------------------------------------------------- diff --cc electron/link.go index 4bef53b,0000000..91efa8e mode 100644,000000..100644 --- a/electron/link.go +++ b/electron/link.go @@@ -1,247 -1,0 +1,245 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( - "qpid.apache.org/internal" ++ "fmt" + "qpid.apache.org/proton" +) + +// Link is the common interface for AMQP links. Sender and Receiver provide +// more methods for the sending or receiving end of a link respectively. +type Link interface { + Endpoint + + // Source address that messages are coming from. + Source() string + + // Target address that messages are going to. + Target() string + + // Name is a unique name for the link among links between the same + // containers in the same direction. By default generated automatically. + LinkName() string + + // IsSender is true if this is the sending end of the link. + IsSender() bool + + // IsReceiver is true if this is the receiving end of the link. + IsReceiver() bool + + // SndSettle defines when the sending end of the link settles message delivery. + SndSettle() SndSettleMode + + // RcvSettle defines when the sending end of the link settles message delivery. + RcvSettle() RcvSettleMode + + // Session containing the Link + Session() Session + + // Called in event loop on closed event. + closed(err error) + // Called to open a link (local or accepted incoming link) + open() +} + - // LinkSetting can be passed when creating a sender or receiver. - // See functions that return LinkSetting for details - type LinkSetting func(*link) ++// LinkOption can be passed when creating a sender or receiver link to set optional configuration. ++type LinkOption func(*link) + - // Source sets address that messages are coming from. - func Source(s string) LinkSetting { return func(l *link) { l.source = s } } ++// Source returns a LinkOption that sets address that messages are coming from. ++func Source(s string) LinkOption { return func(l *link) { l.source = s } } + - // Target sets address that messages are going to. - func Target(s string) LinkSetting { return func(l *link) { l.target = s } } ++// Target returns a LinkOption that sets address that messages are going to. ++func Target(s string) LinkOption { return func(l *link) { l.target = s } } + - // LinkName sets the link name. - func LinkName(s string) LinkSetting { return func(l *link) { l.target = s } } ++// LinkName returns a LinkOption that sets the link name. ++func LinkName(s string) LinkOption { return func(l *link) { l.target = s } } + - // SndSettle sets the send settle mode - func SndSettle(m SndSettleMode) LinkSetting { return func(l *link) { l.sndSettle = m } } ++// SndSettle returns a LinkOption that sets the send settle mode ++func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { l.sndSettle = m } } + - // RcvSettle sets the send settle mode - func RcvSettle(m RcvSettleMode) LinkSetting { return func(l *link) { l.rcvSettle = m } } ++// RcvSettle returns a LinkOption that sets the send settle mode ++func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { l.rcvSettle = m } } + - // SndSettleMode defines when the sending end of the link settles message delivery. ++// SndSettleMode returns a LinkOption that defines when the sending end of the ++// link settles message delivery. +type SndSettleMode proton.SndSettleMode + - // Capacity sets the link capacity - func Capacity(n int) LinkSetting { return func(l *link) { l.capacity = n } } ++// Capacity returns a LinkOption that sets the link capacity ++func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } } + - // Prefetch sets a receivers pre-fetch flag. Not relevant for a sender. - func Prefetch(p bool) LinkSetting { return func(l *link) { l.prefetch = p } } ++// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender. ++func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } } + - // AtMostOnce sets "fire and forget" mode, messages are sent but no - // acknowledgment is received, messages can be lost if there is a network - // failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst - func AtMostOnce() LinkSetting { ++// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages ++// are sent but no acknowledgment is received, messages can be lost if there is ++// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst ++func AtMostOnce() LinkOption { + return func(l *link) { + SndSettle(SndSettled)(l) + RcvSettle(RcvFirst)(l) + } +} + - // AtLeastOnce requests acknowledgment for every message, acknowledgment - // indicates the message was definitely received. In the event of a - // failure, unacknowledged messages can be re-sent but there is a chance - // that the message will be received twice in this case. - // Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst - func AtLeastOnce() LinkSetting { ++// AtLeastOnce returns a LinkOption that requests acknowledgment for every ++// message, acknowledgment indicates the message was definitely received. In the ++// event of a failure, unacknowledged messages can be re-sent but there is a ++// chance that the message will be received twice in this case. Sets ++// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst ++func AtLeastOnce() LinkOption { + return func(l *link) { + SndSettle(SndUnsettled)(l) + RcvSettle(RcvFirst)(l) + } +} + +const ( + // Messages are sent unsettled + SndUnsettled = SndSettleMode(proton.SndUnsettled) + // Messages are sent already settled + SndSettled = SndSettleMode(proton.SndSettled) + // Sender can send either unsettled or settled messages. + SendMixed = SndSettleMode(proton.SndMixed) +) + +// RcvSettleMode defines when the receiving end of the link settles message delivery. +type RcvSettleMode proton.RcvSettleMode + +const ( + // Receiver settles first. + RcvFirst = RcvSettleMode(proton.RcvFirst) + // Receiver waits for sender to settle before settling. + RcvSecond = RcvSettleMode(proton.RcvSecond) +) + +type link struct { + endpoint + + // Link settings. + source string + target string + linkName string + isSender bool + sndSettle SndSettleMode + rcvSettle RcvSettleMode + capacity int + prefetch bool + + session *session + eLink proton.Link - done chan struct{} // Closed when link is closed +} + +func (l *link) Source() string { return l.source } +func (l *link) Target() string { return l.target } +func (l *link) LinkName() string { return l.linkName } +func (l *link) IsSender() bool { return l.isSender } +func (l *link) IsReceiver() bool { return !l.isSender } +func (l *link) SndSettle() SndSettleMode { return l.sndSettle } +func (l *link) RcvSettle() RcvSettleMode { return l.rcvSettle } +func (l *link) Session() Session { return l.session } +func (l *link) Connection() Connection { return l.session.Connection() } + +func (l *link) engine() *proton.Engine { return l.session.connection.engine } +func (l *link) handler() *handler { return l.session.connection.handler } + +// Set up link fields and open the proton.Link - func localLink(sn *session, isSender bool, setting ...LinkSetting) (link, error) { ++func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error) { + l := link{ + session: sn, + isSender: isSender, + capacity: 1, + prefetch: false, - done: make(chan struct{}), + } + for _, set := range setting { + set(&l) + } + if l.linkName == "" { + l.linkName = l.session.connection.container.nextLinkName() + } + if l.IsSender() { + l.eLink = l.session.eSession.Sender(l.linkName) + } else { + l.eLink = l.session.eSession.Receiver(l.linkName) + } + if l.eLink.IsNil() { - l.err.Set(internal.Errorf("cannot create link %s", l)) ++ l.err.Set(fmt.Errorf("cannot create link %s", l)) + return l, l.err.Get() + } + l.eLink.Source().SetAddress(l.source) + l.eLink.Target().SetAddress(l.target) + l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle)) + l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle)) - l.str = l.eLink.String() + l.eLink.Open() ++ l.endpoint = makeEndpoint(l.eLink.String()) + return l, nil +} + +type incomingLink struct { + incoming + link +} + +// Set up a link from an incoming proton.Link. +func makeIncomingLink(sn *session, eLink proton.Link) incomingLink { + l := incomingLink{ ++ incoming: makeIncoming(eLink), + link: link{ + session: sn, + isSender: eLink.IsSender(), + eLink: eLink, + source: eLink.RemoteSource().Address(), + target: eLink.RemoteTarget().Address(), + linkName: eLink.Name(), + sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()), + rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()), + capacity: 1, + prefetch: false, - done: make(chan struct{}), ++ endpoint: makeEndpoint(eLink.String()), + }, + } - l.str = eLink.String() + return l +} + - // Called in proton goroutine on closed or disconnected - func (l *link) closed(err error) { - l.err.Set(err) - l.err.Set(Closed) // If no error set, mark as closed. - close(l.done) - } - +// Not part of Link interface but use by Sender and Receiver. +func (l *link) Credit() (credit int, err error) { + err = l.engine().InjectWait(func() error { ++ if l.Error() != nil { ++ return l.Error() ++ } + credit = l.eLink.Credit() + return nil + }) + return +} + +// Not part of Link interface but use by Sender and Receiver. +func (l *link) Capacity() int { return l.capacity } + +func (l *link) Close(err error) { - l.engine().Inject(func() { localClose(l.eLink, err) }) ++ l.engine().Inject(func() { ++ if l.Error() == nil { ++ localClose(l.eLink, err) ++ } ++ }) +} + +func (l *link) open() { + l.eLink.Open() +}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/messaging_test.go ---------------------------------------------------------------------- diff --cc electron/messaging_test.go index 36b0c24,0000000..5af57e8 mode 100644,000000..100644 --- a/electron/messaging_test.go +++ b/electron/messaging_test.go @@@ -1,416 -1,0 +1,426 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "fmt" + "net" + "path" + "qpid.apache.org/amqp" + "runtime" + "testing" + "time" +) + +func fatalIf(t *testing.T, err error) { + if err != nil { + _, file, line, ok := runtime.Caller(1) // annotate with location of caller. + if ok { + _, file = path.Split(file) + } + t.Fatalf("(from %s:%d) %v", file, line, err) + } +} + - // Start a server, return listening addr and channel for incoming Connection. - func newServer(t *testing.T, cont Container, accept func(Endpoint) error) (net.Addr, <-chan Connection) { ++// Start a server, return listening addr and channel for incoming Connections. ++func newServer(t *testing.T, cont Container) (net.Addr, <-chan Connection) { + listener, err := net.Listen("tcp", "") + fatalIf(t, err) + addr := listener.Addr() + ch := make(chan Connection) + go func() { + conn, err := listener.Accept() - c, err := cont.Connection(conn) ++ c, err := cont.Connection(conn, Server(), AllowIncoming()) + fatalIf(t, err) - c.Server() - c.Listen(accept) - fatalIf(t, c.Open()) + ch <- c + }() + return addr, ch +} + - // Return open an client connection and session, return the session. ++// Open a client connection and session, return the session. +func newClient(t *testing.T, cont Container, addr net.Addr) Session { + conn, err := net.Dial(addr.Network(), addr.String()) + fatalIf(t, err) + c, err := cont.Connection(conn) + fatalIf(t, err) - c.Open() + sn, err := c.Session() + fatalIf(t, err) + return sn +} + +// Return client and server ends of the same connection. - func newClientServer(t *testing.T, accept func(Endpoint) error) (client Session, server Connection) { - addr, ch := newServer(t, NewContainer(""), accept) - client = newClient(t, NewContainer(""), addr) ++func newClientServer(t *testing.T) (client Session, server Connection) { ++ addr, ch := newServer(t, NewContainer("test-server")) ++ client = newClient(t, NewContainer("test-client"), addr) + return client, <-ch +} + +// Close client and server +func closeClientServer(client Session, server Connection) { + client.Connection().Close(nil) + server.Close(nil) +} + +// Send a message one way with a client sender and server receiver, verify ack. +func TestClientSendServerReceive(t *testing.T) { - timeout := time.Second * 2 + nLinks := 3 + nMessages := 3 + + rchan := make(chan Receiver, nLinks) - client, server := newClientServer(t, func(ep Endpoint) error { - if r, ok := ep.(Receiver); ok { - r.SetCapacity(1, false) - rchan <- r ++ client, server := newClientServer(t) ++ go func() { ++ for in := range server.Incoming() { ++ switch in := in.(type) { ++ case *IncomingReceiver: ++ in.SetCapacity(1) ++ in.SetPrefetch(false) ++ rchan <- in.Accept().(Receiver) ++ default: ++ in.Accept() ++ } + } - return nil - }) - - defer func() { - closeClientServer(client, server) + }() + ++ defer func() { closeClientServer(client, server) }() ++ + s := make([]Sender, nLinks) + for i := 0; i < nLinks; i++ { + var err error + s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i))) + if err != nil { + t.Fatal(err) + } + } + r := make([]Receiver, nLinks) + for i := 0; i < nLinks; i++ { + r[i] = <-rchan + } + + for i := 0; i < nLinks; i++ { + for j := 0; j < nMessages; j++ { - var sm SentMessage - + // Client send ++ ack := make(chan Outcome, 1) + sendDone := make(chan struct{}) + go func() { + defer close(sendDone) + m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j)) + var err error - sm, err = s[i].Send(m) ++ s[i].SendAsync(m, ack, "testing") + if err != nil { + t.Fatal(err) + } + }() + + // Server recieve + rm, err := r[i].Receive() + if err != nil { + t.Fatal(err) + } + if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got { + t.Errorf("%#v != %#v", want, got) + } + + // Should not be acknowledged on client yet + <-sendDone - if d, err := sm.DispositionTimeout(0); err != Timeout || NoDisposition != d { - t.Errorf("want [no-disposition/timeout] got [%v/%v]", d, err) ++ select { ++ case <-ack: ++ t.Errorf("unexpected ack") ++ default: + } - // Server ack - if err := rm.Acknowledge(Rejected); err != nil { ++ ++ // Server send ack ++ if err := rm.Reject(); err != nil { + t.Error(err) + } + // Client get ack. - if d, err := sm.DispositionTimeout(timeout); err != nil || Rejected != d { - t.Errorf("want [rejected/nil] got [%v/%v]", d, err) ++ if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected { ++ t.Error("unexpected ack: ", a.Status, a.Error, a.Value) + } + } + } +} + +func TestClientReceiver(t *testing.T) { + nMessages := 3 - client, server := newClientServer(t, func(ep Endpoint) error { - if s, ok := ep.(Sender); ok { - go func() { - for i := int32(0); i < int32(nMessages); i++ { - sm, err := s.Send(amqp.NewMessageWith(i)) - if err != nil { - t.Error(err) - return - } else { - sm.Disposition() // Sync send. ++ client, server := newClientServer(t) ++ go func() { ++ for in := range server.Incoming() { ++ switch in := in.(type) { ++ case *IncomingSender: ++ s := in.Accept().(Sender) ++ go func() { ++ for i := int32(0); i < int32(nMessages); i++ { ++ out := s.SendSync(amqp.NewMessageWith(i)) ++ if out.Error != nil { ++ t.Error(out.Error) ++ return ++ } + } - } - s.Close(nil) - }() ++ s.Close(nil) ++ }() ++ default: ++ in.Accept() ++ } + } - return nil - }) ++ }() + + r, err := client.Receiver(Source("foo")) + if err != nil { + t.Fatal(err) + } + for i := int32(0); i < int32(nMessages); i++ { + rm, err := r.Receive() + if err != nil { + if err != Closed { + t.Error(err) + } + break + } + if err := rm.Accept(); err != nil { + t.Error(err) + } + if b, ok := rm.Message.Body().(int32); !ok || b != i { + t.Errorf("want %v, true got %v, %v", i, b, ok) + } + } + server.Close(nil) + client.Connection().Close(nil) +} + +// Test timeout versions of waiting functions. +func TestTimeouts(t *testing.T) { + var err error + rchan := make(chan Receiver, 1) - client, server := newClientServer(t, func(ep Endpoint) error { - if r, ok := ep.(Receiver); ok { - r.SetCapacity(1, false) // Issue credit only on receive - rchan <- r ++ client, server := newClientServer(t) ++ go func() { ++ for i := range server.Incoming() { ++ switch i := i.(type) { ++ case *IncomingReceiver: ++ i.SetCapacity(1) ++ i.SetPrefetch(false) ++ rchan <- i.Accept().(Receiver) // Issue credit only on receive ++ default: ++ i.Accept() ++ } + } - return nil - }) ++ }() + defer func() { closeClientServer(client, server) }() + + // Open client sender + snd, err := client.Sender(Target("test")) + if err != nil { + t.Fatal(err) + } + rcv := <-rchan + + // Test send with timeout + short := time.Millisecond + long := time.Second + m := amqp.NewMessage() - if _, err = snd.SendTimeout(m, 0); err != Timeout { // No credit, expect timeout. ++ if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout. + t.Error("want Timeout got", err) + } - if _, err = snd.SendTimeout(m, short); err != Timeout { // No credit, expect timeout. ++ if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout. + t.Error("want Timeout got", err) + } + // Test receive with timeout + if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout. + t.Error("want Timeout got", err) + } + // Test receive with timeout + if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout. + t.Error("want Timeout got", err) + } + // There is now a credit on the link due to receive - sm, err := snd.SendTimeout(m, long) - if err != nil { - t.Fatal(err) - } ++ ack := make(chan Outcome) ++ snd.SendAsyncTimeout(m, ack, nil, short) + // Disposition should timeout - if _, err = sm.DispositionTimeout(long); err != Timeout { - t.Error("want Timeout got", err) - } - if _, err = sm.DispositionTimeout(short); err != Timeout { - t.Error("want Timeout got", err) ++ select { ++ case <-ack: ++ t.Errorf("want Timeout got %#v", ack) ++ case <-time.After(short): + } ++ + // Receive and accept + rm, err := rcv.ReceiveTimeout(long) + if err != nil { + t.Fatal(err) + } + rm.Accept() + // Sender get ack - d, err := sm.DispositionTimeout(long) - if err != nil || d != Accepted { - t.Errorf("want (rejected, nil) got (%v, %v)", d, err) ++ if a := <-ack; a.Status != Accepted || a.Error != nil { ++ t.Errorf("want (accepted, nil) got %#v", a) + } +} + +// clientServer that returns sender/receiver pairs at opposite ends of link. +type pairs struct { + t *testing.T + client Session + server Connection + rchan chan Receiver + schan chan Sender +} + +func newPairs(t *testing.T) *pairs { + p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)} - p.client, p.server = newClientServer(t, func(ep Endpoint) error { - switch ep := ep.(type) { - case Receiver: - ep.SetCapacity(1, false) - p.rchan <- ep - case Sender: - p.schan <- ep ++ p.client, p.server = newClientServer(t) ++ go func() { ++ for i := range p.server.Incoming() { ++ switch i := i.(type) { ++ case *IncomingReceiver: ++ i.SetCapacity(1) ++ i.SetPrefetch(false) ++ p.rchan <- i.Accept().(Receiver) ++ case *IncomingSender: ++ p.schan <- i.Accept().(Sender) ++ default: ++ i.Accept() ++ } + } - return nil - }) ++ }() + return p +} + +func (p *pairs) close() { + closeClientServer(p.client, p.server) +} + +func (p *pairs) senderReceiver() (Sender, Receiver) { + snd, err := p.client.Sender() + fatalIf(p.t, err) + rcv := <-p.rchan + return snd, rcv +} + +func (p *pairs) receiverSender() (Receiver, Sender) { + rcv, err := p.client.Receiver() + fatalIf(p.t, err) + snd := <-p.schan + return rcv, snd +} + +type result struct { + label string + err error +} + +func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) } + +func doSend(snd Sender, results chan result) { - _, err := snd.Send(amqp.NewMessage()) ++ err := snd.SendSync(amqp.NewMessage()).Error + results <- result{"send", err} +} + +func doReceive(rcv Receiver, results chan result) { + _, err := rcv.Receive() + results <- result{"receive", err} +} + - func doDisposition(sm SentMessage, results chan result) { - _, err := sm.Disposition() - results <- result{"disposition", err} ++func doDisposition(ack <-chan Outcome, results chan result) { ++ results <- result{"disposition", (<-ack).Error} +} + +// Test that closing Links interrupts blocked link functions. +func TestLinkCloseInterrupt(t *testing.T) { + want := amqp.Errorf("x", "all bad") + pairs := newPairs(t) + results := make(chan result) // Collect expected errors + + // Sender.Close() interrupts Send() + snd, rcv := pairs.senderReceiver() + go doSend(snd, results) + snd.Close(want) + if r := <-results; want != r.err { + t.Errorf("want %#v got %#v", want, r) + } + + // Remote Receiver.Close() interrupts Send() + snd, rcv = pairs.senderReceiver() + go doSend(snd, results) + rcv.Close(want) + if r := <-results; want != r.err { + t.Errorf("want %#v got %#v", want, r) + } + + // Receiver.Close() interrupts Receive() + snd, rcv = pairs.senderReceiver() + go doReceive(rcv, results) + rcv.Close(want) + if r := <-results; want != r.err { + t.Errorf("want %#v got %#v", want, r) + } + + // Remote Sender.Close() interrupts Receive() + snd, rcv = pairs.senderReceiver() + go doReceive(rcv, results) + snd.Close(want) + if r := <-results; want != r.err { + t.Errorf("want %#v got %#v", want, r) + } +} + +// Test closing the server end of a connection. +func TestConnectionCloseInterrupt1(t *testing.T) { + want := amqp.Errorf("x", "bad") + pairs := newPairs(t) + results := make(chan result) // Collect expected errors + + // Connection.Close() interrupts Send, Receive, Disposition. + snd, rcv := pairs.senderReceiver() + go doReceive(rcv, results) - sm, err := snd.Send(amqp.NewMessage()) - fatalIf(t, err) - go doDisposition(sm, results) ++ ack := snd.SendWaitable(amqp.NewMessage()) ++ go doDisposition(ack, results) + snd, rcv = pairs.senderReceiver() + go doSend(snd, results) + rcv, snd = pairs.receiverSender() + go doReceive(rcv, results) + pairs.server.Close(want) + for i := 0; i < 3; i++ { + if r := <-results; want != r.err { + // TODO aconway 2015-10-06: Not propagating the correct error, seeing nil and EOF. - t.Logf("want %v got %v", want, r) ++ t.Logf("want %v got %v", want, r.err) + } + } +} + +// Test closing the client end of the connection. +func TestConnectionCloseInterrupt2(t *testing.T) { + want := amqp.Errorf("x", "bad") + pairs := newPairs(t) + results := make(chan result) // Collect expected errors + + // Connection.Close() interrupts Send, Receive, Disposition. + snd, rcv := pairs.senderReceiver() + go doReceive(rcv, results) - sm, err := snd.Send(amqp.NewMessage()) - fatalIf(t, err) - go doDisposition(sm, results) ++ ack := snd.SendWaitable(amqp.NewMessage()) ++ go doDisposition(ack, results) + snd, rcv = pairs.senderReceiver() + go doSend(snd, results) + rcv, snd = pairs.receiverSender() + go doReceive(rcv, results) + pairs.client.Close(want) + for i := 0; i < 3; i++ { + if r := <-results; want != r.err { + // TODO aconway 2015-10-06: Not propagating the correct error, seeing nil. - t.Logf("want %v got %v", want, r) ++ t.Logf("want %v got %v", want, r.err) + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/receiver.go ---------------------------------------------------------------------- diff --cc electron/receiver.go index 59ac018,0000000..22bdc7e mode 100644,000000..100644 --- a/electron/receiver.go +++ b/electron/receiver.go @@@ -1,238 -1,0 +1,244 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( ++ "fmt" + "qpid.apache.org/amqp" - "qpid.apache.org/internal" + "qpid.apache.org/proton" + "time" +) + +// Receiver is a Link that receives messages. +// +type Receiver interface { + Link + + // Receive blocks until a message is available or until the Receiver is closed + // and has no more buffered messages. + Receive() (ReceivedMessage, error) + + // ReceiveTimeout is like Receive but gives up after timeout, see Timeout. + // + // Note that that if Prefetch is false, after a Timeout the credit issued by + // Receive remains on the link. It will be used by the next call to Receive. + ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error) + + // Prefetch==true means the Receiver will automatically issue credit to the + // remote sender to keep its buffer as full as possible, i.e. it will + // "pre-fetch" messages independently of the application calling + // Receive(). This gives good throughput for applications that handle a + // continuous stream of messages. Larger capacity may improve throughput, the + // optimal value depends on the characteristics of your application. + // + // Prefetch==false means the Receiver will issue only issue credit when you + // call Receive(), and will only issue enough credit to satisfy the calls + // actually made. This gives lower throughput but will not fetch any messages + // in advance. It is good for synchronous applications that need to evaluate + // each message before deciding whether to receive another. The + // request-response pattern is a typical example. If you make concurrent + // calls to Receive with pre-fetch disabled, you can improve performance by + // setting the capacity close to the expected number of concurrent calls. + // + Prefetch() bool + + // Capacity is the size (number of messages) of the local message buffer + // These are messages received but not yet returned to the application by a call to Receive() + Capacity() int +} + +// Flow control policy for a receiver. +type policy interface { + // Called at the start of Receive() to adjust credit before fetching a message. + Pre(*receiver) + // Called after Receive() has received a message from Buffer() before it returns. + // Non-nil error means no message was received because of an error. + Post(*receiver, error) +} + +type prefetchPolicy struct{} + +func (p prefetchPolicy) Flow(r *receiver) { + r.engine().Inject(func() { ++ if r.Error() != nil { ++ return ++ } + _, _, max := r.credit() + if max > 0 { + r.eLink.Flow(max) + } + }) +} +func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) } +func (p prefetchPolicy) Post(r *receiver, err error) { + if err == nil { + p.Flow(r) + } +} + +type noPrefetchPolicy struct{ waiting int } + +func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine + r.engine().Inject(func() { ++ if r.Error() != nil { ++ return ++ } + len, credit, max := r.credit() + add := p.waiting - (len + credit) + if add > max { + add = max // Don't overflow + } + if add > 0 { + r.eLink.Flow(add) + } + }) +} +func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) } +func (p noPrefetchPolicy) Post(r *receiver, err error) { + p.waiting-- + if err == nil { + p.Flow(r) + } +} + +// Receiver implementation +type receiver struct { + link + buffer chan ReceivedMessage + policy policy +} + ++// Call in proton goroutine +func newReceiver(l link) *receiver { + r := &receiver{link: l} + if r.capacity < 1 { + r.capacity = 1 + } + if r.prefetch { + r.policy = &prefetchPolicy{} + } else { + r.policy = &noPrefetchPolicy{} + } + r.buffer = make(chan ReceivedMessage, r.capacity) + r.handler().addLink(r.eLink, r) + r.link.open() + return r +} + +// call in proton goroutine. +func (r *receiver) credit() (buffered, credit, max int) { + return len(r.buffer), r.eLink.Credit(), cap(r.buffer) - len(r.buffer) +} + +func (r *receiver) Capacity() int { return cap(r.buffer) } +func (r *receiver) Prefetch() bool { return r.prefetch } + +func (r *receiver) Receive() (rm ReceivedMessage, err error) { + return r.ReceiveTimeout(Forever) +} + +func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) { - internal.Assert(r.buffer != nil, "Receiver is not open: %s", r) ++ assert(r.buffer != nil, "Receiver is not open: %s", r) + r.policy.Pre(r) + defer func() { r.policy.Post(r, err) }() + rmi, err := timedReceive(r.buffer, timeout) + switch err { + case Timeout: + return ReceivedMessage{}, Timeout + case Closed: + return ReceivedMessage{}, r.Error() + default: + return rmi.(ReceivedMessage), nil + } +} + +// Called in proton goroutine on MMessage event. +func (r *receiver) message(delivery proton.Delivery) { + if r.eLink.State().RemoteClosed() { + localClose(r.eLink, r.eLink.RemoteCondition().Error()) + return + } + if delivery.HasMessage() { + m, err := delivery.Message() + if err != nil { + localClose(r.eLink, err) + return + } - internal.Assert(m != nil) ++ assert(m != nil) + r.eLink.Advance() + if r.eLink.Credit() < 0 { - localClose(r.eLink, internal.Errorf("received message in excess of credit limit")) ++ localClose(r.eLink, fmt.Errorf("received message in excess of credit limit")) + } else { + // We never issue more credit than cap(buffer) so this will not block. + r.buffer <- ReceivedMessage{m, delivery, r} + } + } +} + +func (r *receiver) closed(err error) { + r.link.closed(err) + if r.buffer != nil { + close(r.buffer) + } +} + +// ReceivedMessage contains an amqp.Message and allows the message to be acknowledged. +type ReceivedMessage struct { + // Message is the received message. + Message amqp.Message + + eDelivery proton.Delivery + receiver Receiver +} + - // Acknowledge a ReceivedMessage with the given disposition code. - func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error { - return rm.receiver.(*receiver).engine().InjectWait(func() error { - // Settle doesn't return an error but if the receiver is broken the settlement won't happen. - rm.eDelivery.SettleAs(uint64(disposition)) - return rm.receiver.Error() ++// Acknowledge a ReceivedMessage with the given delivery status. ++func (rm *ReceivedMessage) acknowledge(status uint64) error { ++ return rm.receiver.(*receiver).engine().Inject(func() { ++ // Deliveries are valid as long as the connection is, unless settled. ++ rm.eDelivery.SettleAs(uint64(status)) + }) +} + - // Accept is short for Acknowledge(Accpeted) - func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) } ++// Accept tells the sender that we take responsibility for processing the message. ++func (rm *ReceivedMessage) Accept() error { return rm.acknowledge(proton.Accepted) } ++ ++// Reject tells the sender we consider the message invalid and unusable. ++func (rm *ReceivedMessage) Reject() error { return rm.acknowledge(proton.Rejected) } + - // Reject is short for Acknowledge(Rejected) - func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) } ++// Release tells the sender we will not process the message but some other ++// receiver might. ++func (rm *ReceivedMessage) Release() error { return rm.acknowledge(proton.Released) } + - // IncomingReceiver is passed to the accept() function given to Connection.Listen() - // when there is an incoming request for a receiver link. ++// IncomingReceiver is sent on the Connection.Incoming() channel when there is ++// an incoming request to open a receiver link. +type IncomingReceiver struct { + incomingLink +} + - // Link provides information about the incoming link. - func (i *IncomingReceiver) Link() Link { return i } ++// SetCapacity sets the capacity of the incoming receiver, call before Accept() ++func (in *IncomingReceiver) SetCapacity(capacity int) { in.capacity = capacity } + - // AcceptReceiver sets Capacity and Prefetch of the accepted Receiver. - func (i *IncomingReceiver) AcceptReceiver(capacity int, prefetch bool) Receiver { - i.capacity = capacity - i.prefetch = prefetch - return i.Accept().(Receiver) - } ++// SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept() ++func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = prefetch } + - func (i *IncomingReceiver) Accept() Endpoint { - i.accepted = true - return newReceiver(i.link) ++// Accept accepts an incoming receiver endpoint ++func (in *IncomingReceiver) Accept() Endpoint { ++ return in.accept(func() Endpoint { return newReceiver(in.link) }) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/sender.go ---------------------------------------------------------------------- diff --cc electron/sender.go index 68cfbb3,0000000..573e9da mode 100644,000000..100644 --- a/electron/sender.go +++ b/electron/sender.go @@@ -1,315 -1,0 +1,274 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +// #include <proton/disposition.h> +import "C" + +import ( + "qpid.apache.org/amqp" - "qpid.apache.org/internal" + "qpid.apache.org/proton" - "reflect" + "time" +) + +// Sender is a Link that sends messages. ++// ++// The result of sending a message is provided by an Outcome value. ++// ++// A sender can buffer messages up to the credit limit provided by the remote receiver. ++// Send* methods will block if the buffer is full until there is space. ++// Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error. ++// +type Sender interface { + Link + - // Send a message without waiting for acknowledgement. Returns a SentMessage. - // use SentMessage.Disposition() to wait for acknowledgement and get the - // disposition code. - // - // If the send buffer is full, send blocks until there is space in the buffer. - Send(m amqp.Message) (sm SentMessage, err error) ++ // SendSync sends a message and blocks until the message is acknowledged by the remote receiver. ++ // Returns an Outcome, which may contain an error if the message could not be sent. ++ SendSync(m amqp.Message) Outcome ++ ++ // SendWaitable puts a message in the send buffer and returns a channel that ++ // you can use to wait for the Outcome of just that message. The channel is ++ // buffered so you can receive from it whenever you want without blocking anything. ++ SendWaitable(m amqp.Message) <-chan Outcome ++ ++ // SendForget buffers a message for sending and returns, with no notification of the outcome. ++ SendForget(m amqp.Message) + - // SendTimeout is like send but only waits up to timeout for buffer space. ++ // SendAsync puts a message in the send buffer and returns immediately. An ++ // Outcome with Value = value will be sent to the ack channel when the remote ++ // receiver has acknowledged the message or if there is an error. + // - // Returns Timeout error if the timeout expires and the message has not been sent. - SendTimeout(m amqp.Message, timeout time.Duration) (sm SentMessage, err error) ++ // You can use the same ack channel for many calls to SendAsync(), possibly on ++ // many Senders. The channel will receive the outcomes in the order they ++ // become available. The channel should be buffered and/or served by dedicated ++ // goroutines to avoid blocking the connection. ++ // ++ // If ack == nil no Outcome is sent. ++ SendAsync(m amqp.Message, ack chan<- Outcome, value interface{}) + - // Send a message and forget it, there will be no acknowledgement. - // If the send buffer is full, send blocks until there is space in the buffer. - SendForget(m amqp.Message) error ++ SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration) + - // SendForgetTimeout is like send but only waits up to timeout for buffer space. - // Returns Timeout error if the timeout expires and the message has not been sent. - SendForgetTimeout(m amqp.Message, timeout time.Duration) error ++ SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome + - // Credit indicates how many messages the receiving end of the link can accept. - // - // On a Sender credit can be negative, meaning that messages in excess of the - // receiver's credit limit have been buffered locally till credit is available. - Credit() (int, error) - } ++ SendForgetTimeout(m amqp.Message, timeout time.Duration) + - type sendMessage struct { - m amqp.Message - sm SentMessage ++ SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome +} + - type sender struct { - link - credit chan struct{} // Signal available credit. ++// Outcome provides information about the outcome of sending a message. ++type Outcome struct { ++ // Status of the message: was it sent, how was it acknowledged. ++ Status SentStatus ++ // Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise. ++ Error error ++ // Value provided by the application in SendAsync() ++ Value interface{} +} + - // Disposition indicates the outcome of a settled message delivery. - type Disposition uint64 ++// SentStatus indicates the status of a sent message. ++type SentStatus int + +const ( - // No disposition available: pre-settled, not yet acknowledged or an error occurred - NoDisposition Disposition = 0 ++ // Message was never sent ++ Unsent SentStatus = iota ++ // Message was sent but never acknowledged. It may or may not have been received. ++ Unacknowledged ++ // Message was sent pre-settled, no remote outcome is available. ++ Presettled + // Message was accepted by the receiver - Accepted = proton.Accepted ++ Accepted + // Message was rejected as invalid by the receiver - Rejected = proton.Rejected - // Message was not processed by the receiver but may be processed by some other receiver. - Released = proton.Released ++ Rejected ++ // Message was not processed by the receiver but may be valid for a different receiver ++ Released ++ // Receiver responded with an unrecognized status. ++ Unknown +) + - // String human readable name for a Disposition. - func (d Disposition) String() string { - switch d { - case NoDisposition: - return "no-disposition" ++// String human readable name for SentStatus. ++func (s SentStatus) String() string { ++ switch s { ++ case Unsent: ++ return "unsent" ++ case Unacknowledged: ++ return "unacknowledged" + case Accepted: + return "accepted" + case Rejected: + return "rejected" + case Released: + return "released" - default: ++ case Unknown: + return "unknown" ++ default: ++ return "invalid" + } +} + - func (s *sender) Send(m amqp.Message) (SentMessage, error) { - return s.SendTimeout(m, Forever) - } - - func (s *sender) SendTimeout(m amqp.Message, timeout time.Duration) (SentMessage, error) { - var sm SentMessage - if s.sndSettle == SndSettled { - sm = nil - } else { - sm = newSentMessage(s.session.connection) ++// Convert proton delivery state code to SentStatus value ++func sentStatus(d uint64) SentStatus { ++ switch d { ++ case proton.Accepted: ++ return Accepted ++ case proton.Rejected: ++ return Rejected ++ case proton.Released, proton.Modified: ++ return Released ++ default: ++ return Unknown + } - return s.sendInternal(sendMessage{m, sm}, timeout) - } - - func (s *sender) SendForget(m amqp.Message) error { - return s.SendForgetTimeout(m, Forever) +} + - func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error { - snd := sendMessage{m, nil} - _, err := s.sendInternal(snd, timeout) - return err ++// Sender implementation, held by handler. ++type sender struct { ++ link ++ credit chan struct{} // Signal available credit. +} + - func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) { - if _, err := timedReceive(s.credit, timeout); err != nil { // Wait for credit - if err == Closed { ++func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) { ++ // wait for credit ++ if _, err := timedReceive(s.credit, t); err != nil { ++ if err == Closed && s.Error != nil { + err = s.Error() - internal.Assert(err != nil) + } - return nil, err ++ ack <- Outcome{Unsent, err, v} ++ return + } - if err := s.engine().Inject(func() { s.doSend(snd) }); err != nil { - return nil, err - } - return snd.sm, nil - } - - // Send a message. Handler goroutine - func (s *sender) doSend(snd sendMessage) { - delivery, err := s.eLink.Send(snd.m) - switch sm := snd.sm.(type) { - case nil: - delivery.Settle() - case *sentMessage: - sm.delivery = delivery - if err != nil { - sm.settled(err) - } else { - s.handler().sentMessages[delivery] = sm ++ // Send a message in handler goroutine ++ err := s.engine().Inject(func() { ++ if s.Error() != nil { ++ if ack != nil { ++ ack <- Outcome{Unsent, s.Error(), v} ++ } ++ return + } - default: - internal.Assert(false, "bad SentMessage type %T", snd.sm) - } - if s.eLink.Credit() > 0 { - s.sendable() // Signal credit. ++ if delivery, err := s.eLink.Send(m); err == nil { ++ if ack != nil { // We must report an outcome ++ if s.SndSettle() == SndSettled { ++ delivery.Settle() // Pre-settle if required ++ ack <- Outcome{Presettled, nil, v} ++ } else { ++ s.handler().sentMessages[delivery] = sentMessage{ack, v} ++ } ++ } else { // ack == nil, can't report outcome ++ if s.SndSettle() != SndUnsettled { // Pre-settle unless we are forced not to. ++ delivery.Settle() ++ } ++ } ++ } else { // err != nil ++ if ack != nil { ++ ack <- Outcome{Unsent, err, v} ++ } ++ } ++ if s.eLink.Credit() > 0 { // Signal there is still credit ++ s.sendable() ++ } ++ }) ++ if err != nil && ack != nil { ++ ack <- Outcome{Unsent, err, v} + } +} + - // Signal the sender has credit. Any goroutine. ++// Set credit flag if not already set. Non-blocking, any goroutine +func (s *sender) sendable() { + select { // Non-blocking - case s.credit <- struct{}{}: // Set the flag if not already set. ++ case s.credit <- struct{}{}: + default: + } +} + - func (s *sender) closed(err error) { - s.link.closed(err) - close(s.credit) ++func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan Outcome { ++ out := make(chan Outcome, 1) ++ s.SendAsyncTimeout(m, out, nil, t) ++ return out +} + - func newSender(l link) *sender { - s := &sender{link: l, credit: make(chan struct{}, 1)} - s.handler().addLink(s.eLink, s) - s.link.open() - return s - } - - // SentMessage represents a previously sent message. It allows you to wait for acknowledgement. - type SentMessage interface { - - // Disposition blocks till the message is acknowledged and returns the - // disposition state. - // - // NoDisposition with Error() != nil means the Connection was closed before - // the message was acknowledged. - // - // NoDisposition with Error() == nil means the message was pre-settled or - // Forget() was called. - Disposition() (Disposition, error) - - // DispositionTimeout is like Disposition but gives up after timeout, see Timeout. - DispositionTimeout(time.Duration) (Disposition, error) - - // Forget interrupts any call to Disposition on this SentMessage and tells the - // peer we are no longer interested in the disposition of this message. - Forget() - - // Error returns the error that closed the disposition, or nil if there was no error. - // If the disposition closed because the connection closed, it will return Closed. - Error() error - - // Value is an optional value you wish to associate with the SentMessage. It - // can be the message itself or some form of identifier. - Value() interface{} - SetValue(interface{}) - } - - // SentMessageSet is a concurrent-safe set of sent messages that can be checked - // to get the next completed sent message - type SentMessageSet struct { - cases []reflect.SelectCase - sm []SentMessage - done chan SentMessage - } - - func (s *SentMessageSet) Add(sm SentMessage) { - s.sm = append(s.sm, sm) - s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sm.(*sentMessage).done)}) ++func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) { ++ s.SendAsyncTimeout(m, nil, nil, t) +} + - // Wait waits up to timeout and returns the next SentMessage that has a valid dispositionb - // or an error. - func (s *SentMessageSet) Wait(sm SentMessage, timeout time.Duration) (SentMessage, error) { - s.cases = s.cases[:len(s.sm)] // Remove previous timeout cases - if timeout == 0 { // Non-blocking - s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault}) - } else { - s.cases = append(s.cases, - reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))}) ++func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome { ++ deadline := time.Now().Add(t) ++ ack := s.SendWaitableTimeout(m, t) ++ t = deadline.Sub(time.Now()) // Adjust for time already spent. ++ if t < 0 { ++ t = 0 + } - chosen, _, _ := reflect.Select(s.cases) - if chosen > len(s.sm) { - return nil, Timeout ++ if out, err := timedReceive(ack, t); err == nil { ++ return out.(Outcome) + } else { - sm := s.sm[chosen] - s.sm = append(s.sm[:chosen], s.sm[chosen+1:]...) - return sm, nil ++ if err == Closed && s.Error() != nil { ++ err = s.Error() ++ } ++ return Outcome{Unacknowledged, err, nil} + } +} + - // SentMessage implementation - type sentMessage struct { - connection *connection - done chan struct{} - delivery proton.Delivery - disposition Disposition - err error - value interface{} ++func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) { ++ s.SendAsyncTimeout(m, ack, v, Forever) +} + - func newSentMessage(c *connection) *sentMessage { - return &sentMessage{connection: c, done: make(chan struct{})} ++func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome { ++ return s.SendWaitableTimeout(m, Forever) +} + - func (sm *sentMessage) SetValue(v interface{}) { sm.value = v } - func (sm *sentMessage) Value() interface{} { return sm.value } - func (sm *sentMessage) Disposition() (Disposition, error) { - <-sm.done - return sm.disposition, sm.err ++func (s *sender) SendForget(m amqp.Message) { ++ s.SendForgetTimeout(m, Forever) +} + - func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) { - if _, err := timedReceive(sm.done, timeout); err == Timeout { - return sm.disposition, Timeout - } else { - return sm.disposition, sm.err - } ++func (s *sender) SendSync(m amqp.Message) Outcome { ++ return <-s.SendWaitable(m) +} + - func (sm *sentMessage) Forget() { - sm.connection.engine.Inject(func() { - sm.delivery.Settle() - delete(sm.connection.handler.sentMessages, sm.delivery) - }) - sm.finish() ++// handler goroutine ++func (s *sender) closed(err error) { ++ s.link.closed(err) ++ close(s.credit) +} + - func (sm *sentMessage) settled(err error) { - if sm.delivery.Settled() { - sm.disposition = Disposition(sm.delivery.Remote().Type()) - } - sm.err = err - sm.finish() ++func newSender(l link) *sender { ++ s := &sender{link: l, credit: make(chan struct{}, 1)} ++ s.handler().addLink(s.eLink, s) ++ s.link.open() ++ return s +} + - func (sm *sentMessage) finish() { - select { - case <-sm.done: // No-op if already closed - default: - close(sm.done) - } ++// sentMessage records a sent message on the handler. ++type sentMessage struct { ++ ack chan<- Outcome ++ value interface{} +} + - func (sm *sentMessage) Error() error { return sm.err } - - // IncomingSender is passed to the accept() function given to Connection.Listen() - // when there is an incoming request for a sender link. ++// IncomingSender is sent on the Connection.Incoming() channel when there is ++// an incoming request to open a sender link. +type IncomingSender struct { + incomingLink +} + - // Link provides information about the incoming link. - func (i *IncomingSender) Link() Link { return i } - - func (i *IncomingSender) AcceptSender() Sender { return i.Accept().(Sender) } ++// Accept accepts an incoming sender endpoint ++func (in *IncomingSender) Accept() Endpoint { ++ return in.accept(func() Endpoint { return newSender(in.link) }) ++} + - func (i *IncomingSender) Accept() Endpoint { - i.accepted = true - return newSender(i.link) ++// Call in injected functions to check if the sender is valid. ++func (s *sender) valid() bool { ++ s2, ok := s.handler().links[s.eLink].(*sender) ++ return ok && s2 == s +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/session.go ---------------------------------------------------------------------- diff --cc electron/session.go index 3531da6,0000000..18d8bc8 mode 100644,000000..100644 --- a/electron/session.go +++ b/electron/session.go @@@ -1,125 -1,0 +1,128 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "qpid.apache.org/proton" +) + +// Session is an AMQP session, it contains Senders and Receivers. +type Session interface { + Endpoint + - // Sender opens a new sender. v can be a string, which is used as the Target - // address, or a SenderSettings struct containing more details settings. - Sender(...LinkSetting) (Sender, error) - - // Receiver opens a new Receiver. v can be a string, which is used as the - // Source address, or a ReceiverSettings struct containing more details - // settings. - Receiver(...LinkSetting) (Receiver, error) ++ // Sender opens a new sender. ++ Sender(...LinkOption) (Sender, error) ++ ++ // Receiver opens a new Receiver. ++ Receiver(...LinkOption) (Receiver, error) +} + +type session struct { + endpoint + eSession proton.Session + connection *connection + capacity uint +} + - // SessionSetting can be passed when creating a sender or receiver. - // See functions that return SessionSetting for details - type SessionSetting func(*session) ++// SessionOption can be passed when creating a Session ++type SessionOption func(*session) + - // IncomingCapacity sets the size (in bytes) of the sessions incoming data buffer.. - func IncomingCapacity(cap uint) SessionSetting { return func(s *session) { s.capacity = cap } } ++// IncomingCapacity returns a Session Option that sets the size (in bytes) of ++// the sessions incoming data buffer.. ++func IncomingCapacity(cap uint) SessionOption { return func(s *session) { s.capacity = cap } } + +// in proton goroutine - func newSession(c *connection, es proton.Session, setting ...SessionSetting) *session { ++func newSession(c *connection, es proton.Session, setting ...SessionOption) *session { + s := &session{ + connection: c, + eSession: es, - endpoint: endpoint{str: es.String()}, ++ endpoint: makeEndpoint(es.String()), + } + for _, set := range setting { + set(s) + } + c.handler.sessions[s.eSession] = s + s.eSession.SetIncomingCapacity(s.capacity) + s.eSession.Open() + return s +} + +func (s *session) Connection() Connection { return s.connection } +func (s *session) eEndpoint() proton.Endpoint { return s.eSession } +func (s *session) engine() *proton.Engine { return s.connection.engine } ++ +func (s *session) Close(err error) { - s.engine().Inject(func() { localClose(s.eSession, err) }) ++ s.engine().Inject(func() { ++ if s.Error() == nil { ++ localClose(s.eSession, err) ++ } ++ }) +} + - func (s *session) SetCapacity(bytes uint) { s.capacity = bytes } - - func (s *session) Sender(setting ...LinkSetting) (snd Sender, err error) { ++func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) { + err = s.engine().InjectWait(func() error { ++ if s.Error() != nil { ++ return s.Error() ++ } + l, err := localLink(s, true, setting...) + if err == nil { + snd = newSender(l) + } + return err + }) + return +} + - func (s *session) Receiver(setting ...LinkSetting) (rcv Receiver, err error) { ++func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) { + err = s.engine().InjectWait(func() error { ++ if s.Error() != nil { ++ return s.Error() ++ } + l, err := localLink(s, false, setting...) + if err == nil { + rcv = newReceiver(l) + } + return err + }) + return +} + - // Called from handler on closed. - func (s *session) closed(err error) { - s.err.Set(err) - s.err.Set(Closed) - } - - // IncomingSession is passed to the accept() function given to Connection.Listen() - // when there is an incoming session request. ++// IncomingSender is sent on the Connection.Incoming() channel when there is an ++// incoming request to open a session. +type IncomingSession struct { + incoming + h *handler + pSession proton.Session + capacity uint +} + - // AcceptCapacity sets the session buffer capacity of an incoming session in bytes. - func (i *IncomingSession) AcceptSession(bytes uint) Session { - i.capacity = bytes - return i.Accept().(Session) ++func newIncomingSession(h *handler, ps proton.Session) *IncomingSession { ++ return &IncomingSession{incoming: makeIncoming(ps), h: h, pSession: ps} +} + - func (i *IncomingSession) Accept() Endpoint { - i.accepted = true - return newSession(i.h.connection, i.pSession, IncomingCapacity(i.capacity)) ++// SetCapacity sets the session buffer capacity of an incoming session in bytes. ++func (in *IncomingSession) SetCapacity(bytes uint) { in.capacity = bytes } ++ ++// Accept an incoming session endpoint. ++func (in *IncomingSession) Accept() Endpoint { ++ return in.accept(func() Endpoint { ++ return newSession(in.h.connection, in.pSession, IncomingCapacity(in.capacity)) ++ }) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/time.go ---------------------------------------------------------------------- diff --cc electron/time.go index 3407b82,0000000..51bfbc5 mode 100644,000000..100644 --- a/electron/time.go +++ b/electron/time.go @@@ -1,82 -1,0 +1,83 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( - "qpid.apache.org/internal" ++ "fmt" ++ "math" + "reflect" + "time" +) + +// Timeout is the error returned if an operation does not complete on time. +// +// Methods named *Timeout in this package take time.Duration timeout parameter. +// +// If timeout > 0 and there is no result available before the timeout, they +// return a zero or nil value and Timeout as an error. +// +// If timeout == 0 they will return a result if one is immediatley available or +// nil/zero and Timeout as an error if not. +// +// If timeout == Forever the function will return only when there is a result or +// some non-timeout error occurs. +// - var Timeout = internal.Errorf("timeout") ++var Timeout = fmt.Errorf("timeout") + +// Forever can be used as a timeout parameter to indicate wait forever. - const Forever time.Duration = -1 ++const Forever time.Duration = math.MaxInt64 + +// timedReceive receives on channel (which can be a chan of any type), waiting +// up to timeout. +// +// timeout==0 means do a non-blocking receive attempt. timeout < 0 means block +// forever. Other values mean block up to the timeout. +// +// Returns error Timeout on timeout, Closed on channel close. +func timedReceive(channel interface{}, timeout time.Duration) (interface{}, error) { + cases := []reflect.SelectCase{ + reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)}, + } + if timeout == 0 { // Non-blocking + cases = append(cases, reflect.SelectCase{Dir: reflect.SelectDefault}) + } else { // Block up to timeout + cases = append(cases, + reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))}) + } + chosen, value, ok := reflect.Select(cases) + switch { - case !ok: - return nil, Closed - case chosen == 0: ++ case chosen == 0 && ok: + return value.Interface(), nil ++ case chosen == 0 && !ok: ++ return nil, Closed + default: + return nil, Timeout + } +} + +// After is like time.After but returns a nil channel if timeout == Forever +// since selecting on a nil channel will never return. +func After(timeout time.Duration) <-chan time.Time { + if timeout == Forever { + return nil + } else { + return time.After(timeout) + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/proton/engine.go ---------------------------------------------------------------------- diff --cc proton/engine.go index 2cebb49,0000000..2e67ef7 mode 100644,000000..100644 --- a/proton/engine.go +++ b/proton/engine.go @@@ -1,402 -1,0 +1,403 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package proton + +// #include <proton/connection.h> +// #include <proton/event.h> +// #include <proton/handlers.h> +// #include <proton/session.h> +// #include <proton/transport.h> +// #include <memory.h> +// #include <stdlib.h> +// +// PN_HANDLE(REMOTE_ADDR) +import "C" + +import ( + "fmt" + "io" + "net" - "qpid.apache.org/internal" + "sync" + "unsafe" +) + +// Injecter allows functions to be "injected" into the event-processing loop, to +// be called in the same goroutine as event handlers. +type Injecter interface { + // Inject a function into the engine goroutine. + // + // f() will be called in the same goroutine as event handlers, so it can safely + // use values belonging to event handlers without synchronization. f() should + // not block, no further events or injected functions can be processed until + // f() returns. + // + // Returns a non-nil error if the function could not be injected and will + // never be called. Otherwise the function will eventually be called. + // + // Note that proton values (Link, Session, Connection etc.) that existed when + // Inject(f) was called may have become invalid by the time f() is executed. + // Handlers should handle keep track of Closed events to ensure proton values + // are not used after they become invalid. One technique is to have map from + // proton values to application values. Check that the map has the correct + // proton/application value pair at the start of the injected function and + // delete the value from the map when handling a Closed event. + Inject(f func()) error + + // InjectWait is like Inject but does not return till f() has completed. + // If f() cannot be injected it returns the error from Inject(), otherwise + // it returns the error from f() + InjectWait(f func() error) error +} + +// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel. +type bufferChan struct { + buffers chan []byte + buf1, buf2 []byte +} + +func newBufferChan(size int) *bufferChan { + return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)} +} + +func (b *bufferChan) buffer() []byte { + b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers. + return b.buf1[:cap(b.buf1)] +} + +// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate +// Handler functions sequentially in a single goroutine. Actions taken by +// Handler functions (such as sending messages) are encoded and written to the +// net.Conn. You can create multiple Engines to handle multiple connections +// concurrently. +// +// You implement the EventHandler and/or MessagingHandler interfaces and provide +// those values to NewEngine(). Their HandleEvent method will be called in the +// event-handling goroutine. +// +// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to +// other goroutines, store them, or use them as map indexes. Effectively they are +// just pointers. Other goroutines cannot call their methods directly but they can +// can create a function closure to call such methods and pass it to Engine.Inject() +// to have it evaluated in the engine goroutine. +// +// You are responsible for ensuring you don't use an event value after it is +// invalid. The handler methods will tell you when a value is no longer valid. For +// example after a LinkClosed event, that link is no longer valid. If you do +// Link.Close() yourself (in a handler or injected function) the link remains valid +// until the corresponing LinkClosed event is received by the handler. +// +// Engine.Close() will take care of cleaning up any remaining values when you are +// done with the Engine. All values associated with a engine become invalid when you +// call Engine.Close() +// +// The qpid.apache.org/proton/concurrent package will do all this for you, so it +// may be a better choice for some applications. +// +type Engine struct { + // Error is set on exit from Run() if there was an error. - err internal.ErrorHolder ++ err ErrorHolder + inject chan func() + + conn net.Conn + connection Connection + transport Transport + collector *C.pn_collector_t + read *bufferChan // Read buffers channel. + write *bufferChan // Write buffers channel. + handlers []EventHandler // Handlers for proton events. + running chan struct{} // This channel will be closed when the goroutines are done. + closeOnce sync.Once +} + +const bufferSize = 4096 + - // Map of Connection to *Engine - var engines = internal.MakeSafeMap() - +// NewEngine initializes a engine with a connection and handlers. To start it running: +// eng := NewEngine(...) +// go run eng.Run() +// The goroutine will exit when the engine is closed or disconnected. +// You can check for errors on Engine.Error. +// +func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) { + // Save the connection ID for Connection.String() + eng := &Engine{ + inject: make(chan func()), + conn: conn, + transport: Transport{C.pn_transport()}, + connection: Connection{C.pn_connection()}, + collector: C.pn_collector(), + handlers: handlers, + read: newBufferChan(bufferSize), + write: newBufferChan(bufferSize), + running: make(chan struct{}), + } + if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil { - return nil, internal.Errorf("failed to allocate engine") ++ return nil, fmt.Errorf("failed to allocate engine") + } + + // TODO aconway 2015-06-25: connection settings for user, password, container etc. + // before transport.Bind() Set up connection before Engine, allow Engine or Reactor + // to run connection. + + // Unique container-id by default. - eng.connection.SetContainer(internal.UUID4().String()) ++ eng.connection.SetContainer(UUID4().String()) + pnErr := eng.transport.Bind(eng.connection) + if pnErr != 0 { - return nil, internal.Errorf("cannot setup engine: %s", internal.PnErrorCode(pnErr)) ++ return nil, fmt.Errorf("cannot setup engine: %s", PnErrorCode(pnErr)) + } + C.pn_connection_collect(eng.connection.pn, eng.collector) + eng.connection.Open() - connectionContexts.Put(eng.connection, connectionContext{eng.String()}) + return eng, nil +} + +func (eng *Engine) String() string { + return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr()) +} + +func (eng *Engine) Id() string { + return fmt.Sprintf("%eng", &eng) +} + +func (eng *Engine) Error() error { + return eng.err.Get() +} + +// Inject a function into the Engine's event loop. +// +// f() will be called in the same event-processing goroutine that calls Handler +// methods. f() can safely call methods on values that belong to this engine +// (Sessions, Links etc) +// +// The injected function has no parameters or return values. It is normally a +// closure and can use channels to communicate with the injecting goroutine if +// necessary. +// +// Returns a non-nil error if the engine is closed before the function could be +// injected. +func (eng *Engine) Inject(f func()) error { + select { + case eng.inject <- f: + return nil + case <-eng.running: + return eng.Error() + } +} + +// InjectWait is like Inject but does not return till f() has completed or the +// engine is closed, and returns an error value from f() +func (eng *Engine) InjectWait(f func() error) error { + done := make(chan error) + defer close(done) + err := eng.Inject(func() { done <- f() }) + if err != nil { + return err + } + select { + case <-eng.running: + return eng.Error() + case err := <-done: + return err + } +} + +// Server puts the Engine in server mode, meaning it will auto-detect security settings on +// the incoming connnection such as use of SASL and SSL. +// Must be called before Run() +// +func (eng *Engine) Server() { eng.transport.SetServer() } + +// Close the engine's connection, returns when the engine has exited. +func (eng *Engine) Close(err error) { + eng.err.Set(err) + eng.Inject(func() { + CloseError(eng.connection, err) + }) + <-eng.running +} + +// Disconnect the engine's connection without and AMQP close, returns when the engine has exited. +func (eng *Engine) Disconnect(err error) { + eng.err.Set(err) + eng.conn.Close() + <-eng.running +} + +// Run the engine. Engine.Run() will exit when the engine is closed or +// disconnected. You can check for errors after exit with Engine.Error(). +// +func (eng *Engine) Run() error { + wait := sync.WaitGroup{} + wait.Add(2) // Read and write goroutines + + readErr := make(chan error, 1) // Don't block + go func() { // Read goroutine + defer wait.Done() + for { + rbuf := eng.read.buffer() + n, err := eng.conn.Read(rbuf) + if n > 0 { + eng.read.buffers <- rbuf[:n] + } + if err != nil { + readErr <- err + close(readErr) + close(eng.read.buffers) + return + } + } + }() + + writeErr := make(chan error, 1) // Don't block + go func() { // Write goroutine + defer wait.Done() + for { + wbuf, ok := <-eng.write.buffers + if !ok { + return + } + _, err := eng.conn.Write(wbuf) + if err != nil { + writeErr <- err + close(writeErr) + return + } + } + }() + + wbuf := eng.write.buffer()[:0] + + for eng.err.Get() == nil { + if len(wbuf) == 0 { + eng.pop(&wbuf) + } + // Don't set wchan unless there is something to write. + var wchan chan []byte + if len(wbuf) > 0 { + wchan = eng.write.buffers + } + + select { + case buf, ok := <-eng.read.buffers: // Read a buffer + if ok { + eng.push(buf) + } + case wchan <- wbuf: // Write a buffer + wbuf = eng.write.buffer()[:0] + case f, ok := <-eng.inject: // Function injected from another goroutine + if ok { + f() + } + case err := <-readErr: + eng.netError(err) + case err := <-writeErr: + eng.netError(err) + } + eng.process() + } + close(eng.write.buffers) + eng.conn.Close() // Make sure connection is closed + wait.Wait() + close(eng.running) // Signal goroutines have exited and Error is set. + - connectionContexts.Delete(eng.connection) ++ // Execute any injected functions for side effects on application data structures. ++ inject := eng.inject ++ eng.inject = nil // Further calls to Inject() will return an error. ++ for f := range inject { ++ f() ++ } ++ + if !eng.connection.IsNil() { + eng.connection.Free() + } + if !eng.transport.IsNil() { + eng.transport.Free() + } + if eng.collector != nil { + C.pn_collector_free(eng.collector) + } + for _, h := range eng.handlers { + switch h := h.(type) { + case cHandler: + C.pn_handler_free(h.pn) + } + } + return eng.err.Get() +} + +func (eng *Engine) netError(err error) { + eng.err.Set(err) + eng.transport.CloseHead() + eng.transport.CloseTail() +} + +func minInt(a, b int) int { + if a < b { + return a + } else { + return b + } +} + +func (eng *Engine) pop(buf *[]byte) { + pending := int(eng.transport.Pending()) + switch { + case pending == int(C.PN_EOS): + *buf = (*buf)[:] + return + case pending < 0: - panic(internal.Errorf("%s", internal.PnErrorCode(pending))) ++ panic(fmt.Errorf("%s", PnErrorCode(pending))) + } + size := minInt(pending, cap(*buf)) + *buf = (*buf)[:size] + if size == 0 { + return + } + C.memcpy(unsafe.Pointer(&(*buf)[0]), eng.transport.Head(), C.size_t(size)) - internal.Assert(size > 0) ++ assert(size > 0) + eng.transport.Pop(uint(size)) +} + +func (eng *Engine) push(buf []byte) { + buf2 := buf + for len(buf2) > 0 { + n := eng.transport.Push(buf2) + if n <= 0 { - panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n))) ++ panic(fmt.Errorf("error in transport: %s", PnErrorCode(n))) + } + buf2 = buf2[n:] + } +} + +func (eng *Engine) handle(e Event) { + for _, h := range eng.handlers { + h.HandleEvent(e) + } + if e.Type() == ETransportClosed { + eng.err.Set(io.EOF) + } +} + +func (eng *Engine) process() { + for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) { + eng.handle(makeEvent(ce, eng)) + C.pn_collector_pop(eng.collector) + } +} + +func (eng *Engine) Connection() Connection { return eng.connection } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/proton/error.go ---------------------------------------------------------------------- diff --cc proton/error.go index f9cc948,0000000..80d9680 mode 100644,000000..100644 --- a/proton/error.go +++ b/proton/error.go @@@ -1,96 -1,0 +1,96 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + ++// Internal implementation details - ignore. +package proton + ++// #cgo LDFLAGS: -lqpid-proton +// #include <proton/error.h> +// #include <proton/codec.h> +import "C" + +import ( + "fmt" - "reflect" - "runtime" ++ "sync" ++ "sync/atomic" +) + - var pnErrorNames = map[int]string{ - C.PN_EOS: "end of data", - C.PN_ERR: "error", - C.PN_OVERFLOW: "overflow", - C.PN_UNDERFLOW: "underflow", - C.PN_STATE_ERR: "bad state", - C.PN_ARG_ERR: "invalid argument", - C.PN_TIMEOUT: "timeout", - C.PN_INTR: "interrupted", - C.PN_INPROGRESS: "in progress", - } ++type PnErrorCode int + - func pnErrorName(code int) string { - name := pnErrorNames[code] - if name != "" { - return name - } else { - return "unknown error code" ++func (e PnErrorCode) String() string { ++ switch e { ++ case C.PN_EOS: ++ return "end-of-data" ++ case C.PN_ERR: ++ return "error" ++ case C.PN_OVERFLOW: ++ return "overflow" ++ case C.PN_UNDERFLOW: ++ return "underflow" ++ case C.PN_STATE_ERR: ++ return "bad-state" ++ case C.PN_ARG_ERR: ++ return "invalid-argument" ++ case C.PN_TIMEOUT: ++ return "timeout" ++ case C.PN_INTR: ++ return "interrupted" ++ case C.PN_INPROGRESS: ++ return "in-progress" ++ default: ++ return fmt.Sprintf("unknown-error(%d)", e) + } +} + - type BadUnmarshal struct { - AMQPType C.pn_type_t - GoType reflect.Type ++func PnError(e *C.pn_error_t) error { ++ if e == nil || C.pn_error_code(e) == 0 { ++ return nil ++ } ++ return fmt.Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e))) +} + - func newBadUnmarshal(pnType C.pn_type_t, v interface{}) *BadUnmarshal { - return &BadUnmarshal{pnType, reflect.TypeOf(v)} ++// ErrorHolder is a goroutine-safe error holder that keeps the first error that is set. ++type ErrorHolder struct { ++ once sync.Once ++ value atomic.Value +} + - func (e BadUnmarshal) Error() string { - if e.GoType.Kind() != reflect.Ptr { - return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType) - } else { - return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", getPnType(e.AMQPType), e.GoType) ++// Set the error if not already set, return the error in the Holder. ++func (e *ErrorHolder) Set(err error) { ++ if err != nil { ++ e.once.Do(func() { e.value.Store(err) }) + } +} + - // errorf creates an error with a formatted message - func errorf(format string, a ...interface{}) error { - return fmt.Errorf("proton: %s", fmt.Sprintf(format, a...)) ++// Get the error. ++func (e *ErrorHolder) Get() (err error) { ++ err, _ = e.value.Load().(error) ++ return +} + - func pnDataError(data *C.pn_data_t) string { - err := C.pn_data_error(data) - if err != nil && int(C.pn_error_code(err)) != 0 { - return C.GoString(C.pn_error_text(err)) - } - return "" - } - - // doRecover is called to recover from internal panics - func doRecover(err *error) { - r := recover() - switch r := r.(type) { - case nil: - return - case runtime.Error: - panic(r) - case error: - *err = r - default: - panic(r) ++// assert panics if condition is false with optional formatted message ++func assert(condition bool, format ...interface{}) { ++ if !condition { ++ if len(format) > 0 { ++ panic(fmt.Errorf(format[0].(string), format[1:]...)) ++ } else { ++ panic(fmt.Errorf("assertion failed")) ++ } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
