http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/link.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/link.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/link.go new file mode 100644 index 0000000..cf4b8aa --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/link.go @@ -0,0 +1,232 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package concurrent + +import ( + "qpid.apache.org/proton" + "qpid.apache.org/proton/internal" +) + +type LinkSettings struct { + // Source address that messages come from. + Source string + // Target address that messages are going to. + Target string + + // Unique (per container) name of the link. + // Leave blank to have the container generate a unique name automatically. + Name string + + // SndSettleMode defines when the sending end of the link settles message delivery. + // Can set via AtMostOnce or AtLeastOnce. + SndSettleMode SndSettleMode + + // RcvSettleMode defines when the receiving end of the link settles message delivery. + RcvSettleMode RcvSettleMode +} + +// AtMostOnce sets "fire and forget" mode, messages are sent but no +// acknowledgment is received, messages can be lost if there is a network +// failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst +func (s *LinkSettings) AtMostOnce() { + s.SndSettleMode = SndSettled + s.RcvSettleMode = RcvFirst +} + +// AtLeastOnce requests acknowledgment for every message, acknowledgment +// indicates the message was definitely received. In the event of a +// failure, unacknowledged messages can be re-sent but there is a chance +// that the message will be received twice in this case. +// Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst +func (s *LinkSettings) AtLeastOnce() { + s.SndSettleMode = SndUnsettled + s.RcvSettleMode = RcvFirst +} + +// Link is the common interface for Sender and Receiver links. +type Link interface { + Endpoint + + // Settings for this link. + Settings() LinkSettings + + IsSender() bool + IsReceiver() bool + + IsOpen() bool + + // Credit indicates how many messages the receiving end of the link can accept. + // + // A Receiver adds credit automatically when it can accept more messages. + // + // On a Sender credit can be negative, meaning that messages in excess of the + // receiver's credit limit have been buffered locally till credit is available. + Credit() (int, error) + + // Called in event loop on closed event. + closed(err error) +} + +// SndSettleMode defines when the sending end of the link settles message delivery. +// Can set via AtMostOnce or AtLeastOnce. +type SndSettleMode proton.SndSettleMode + +const ( + // Messages are sent unsettled + SndUnsettled = SndSettleMode(proton.SndUnsettled) + // Messages are sent already settled + SndSettled = SndSettleMode(proton.SndSettled) + // Sender can send either unsettled or settled messages. + SendMixed = SndSettleMode(proton.SndMixed) +) + +// RcvSettleMode defines when the receiving end of the link settles message delivery. +type RcvSettleMode proton.RcvSettleMode + +const ( + // Receiver settles first. + RcvFirst = RcvSettleMode(proton.RcvFirst) + // Receiver waits for sender to settle before settling. + RcvSecond = RcvSettleMode(proton.RcvSecond) +) + +// Implement Link interface, for embedding in sender and receiver. +// +// Link creation: there are two ways to create a link. +// +// Session.NewSender() and Session.NewReceiver() create a "local" link which has +// the session and isSender fields set. The user can set properties like Name, +// Target and Source. On Open() the eLink is created and the properties are set +// on the eLink. +// +// An "incoming" is created by the connection. with session, isSender, name, +// source, target all set from the incoming eLink, these properties cannot be +// changed by the user. There may be other properties (e.g. Receiver.SetCapacity()) +// that can be set by the user before Open(). +// +type link struct { + endpoint + + settings LinkSettings + session *session + eLink proton.Link + isOpen bool + isSender bool +} + +func (l *link) Settings() LinkSettings { return l.settings } +func (l *link) IsSender() bool { return l.isSender } +func (l *link) IsReceiver() bool { return !l.isSender } +func (l *link) IsOpen() bool { return l.isOpen } +func (l *link) Session() Session { return l.session } +func (l *link) Connection() Connection { return l.session.Connection() } + +func (l *link) engine() *proton.Engine { return l.session.connection.engine } +func (l *link) handler() *handler { return l.session.connection.handler } + +// initLocal initializes a local link associated with a session. +// Call in proton goroutine +func makeLocalLink(sn *session, isSender bool, settings LinkSettings) (link, error) { + var l link + l.session = sn + l.settings = settings + l.isSender = isSender + if l.settings.Name == "" { + l.settings.Name = l.session.connection.container.nextLinkName() + } + if l.IsSender() { + l.eLink = l.session.eSession.Sender(l.settings.Name) + } else { + l.eLink = l.session.eSession.Receiver(l.settings.Name) + } + if l.eLink.IsNil() { + return l, l.setError(internal.Errorf("cannot create link %s", l)) + } + l.setSettings() + return l, nil +} + +// Set local end of the link to match LinkSettings. +func (l *link) setSettings() { + l.eLink.Source().SetAddress(l.settings.Source) + l.eLink.Target().SetAddress(l.settings.Target) + l.eLink.SetSndSettleMode(proton.SndSettleMode(l.settings.SndSettleMode)) + l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.settings.RcvSettleMode)) + l.str = l.eLink.String() +} + +// initIncoming sets link settings from an incoming proton.Link. +// Call in proton goroutine +func makeIncomingLink(sn *session, eLink proton.Link) link { + l := link{ + session: sn, + isSender: eLink.IsSender(), + eLink: eLink, + settings: LinkSettings{ + Source: eLink.RemoteSource().Address(), + Target: eLink.RemoteTarget().Address(), + Name: eLink.Name(), + SndSettleMode: SndSettleMode(eLink.RemoteSndSettleMode()), + RcvSettleMode: RcvSettleMode(eLink.RemoteRcvSettleMode()), + }, + } + l.setSettings() + return l +} + +func (l *link) setPanicIfOpen() { + if l.IsOpen() { + panic(internal.Errorf("link is already open %s", l)) + } +} + +// open a link, local or incoming. Call in proton goroutine +func (l *link) open() error { + if l.Error() != nil { + return l.Error() + } + l.eLink.Open() + l.isOpen = true + return nil +} + +// Called in proton goroutine +func (l *link) closed(err error) { + l.setError(err) + if l.eLink.State().RemoteActive() { + if l.Error() != nil { + l.eLink.Condition().SetError(l.Error()) + } + l.eLink.Close() + } + l.setError(Closed) // If no error set, mark as closed. +} + +func (l *link) Credit() (credit int, err error) { + err = l.engine().InjectWait(func() error { + credit = l.eLink.Credit() + return nil + }) + return +} + +func (l *link) Close(err error) { + l.engine().Inject(func() { localClose(l.eLink, err) }) +}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go new file mode 100644 index 0000000..aa806d7 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go @@ -0,0 +1,205 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package concurrent + +import ( + "fmt" + "net" + "qpid.apache.org/proton/amqp" + "testing" + "time" +) + +func panicIf(err error) { + if err != nil { + panic(err) + } +} + +// Start a server, return listening addr and channel for incoming Connection. +func newServer(cont Container) (net.Addr, <-chan Connection) { + listener, err := net.Listen("tcp", "") + panicIf(err) + addr := listener.Addr() + ch := make(chan Connection) + go func() { + conn, err := listener.Accept() + c, err := cont.NewConnection(conn) + panicIf(err) + c.Server() + c.Listen() + panicIf(c.Open()) + ch <- c + }() + return addr, ch +} + +// Return open an client connection and session, return the session. +func newClient(cont Container, addr net.Addr) Session { + conn, err := net.Dial(addr.Network(), addr.String()) + panicIf(err) + c, err := cont.NewConnection(conn) + panicIf(err) + c.Open() + sn, err := c.NewSession() + panicIf(err) + panicIf(sn.Open()) + return sn +} + +// Return client and server ends of the same connection. +func newClientServer() (client Session, server Connection) { + addr, ch := newServer(NewContainer("")) + client = newClient(NewContainer(""), addr) + return client, <-ch +} + +// Close client and server +func closeClientServer(client Session, server Connection) { + client.Connection().Close(nil) + server.Close(nil) +} + +// Send a message one way with a client sender and server receiver, verify ack. +func TestClientSendServerReceive(t *testing.T) { + var err error + client, server := newClientServer() + defer func() { + closeClientServer(client, server) + }() + + timeout := time.Second * 2 + nLinks := 3 + nMessages := 3 + + s := make([]Sender, nLinks) + for i := 0; i < nLinks; i++ { + s[i], err = client.Sender(fmt.Sprintf("foo%d", i)) + if err != nil { + t.Fatal(err) + } + } + + // Server accept session and receivers + ep, err := server.Accept() + ep.Open() // Accept incoming session + r := make([]Receiver, nLinks) + for i := 0; i < nLinks; i++ { // Accept incoming receivers + ep, err = server.Accept() + r[i] = ep.(Receiver) + err = r[i].Open() + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < nLinks; i++ { + for j := 0; j < nMessages; j++ { + // Client send + m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j)) + sm, err := s[i].Send(m) + if err != nil { + t.Fatal(err) + } + + // Server recieve + rm, err := r[i].Receive() + if err != nil { + t.Fatal(err) + } + if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got { + t.Errorf("%#v != %#v", want, got) + } + + // Should not be acknowledged on client yet + if d, err := sm.DispositionTimeout(0); err != Timeout || NoDisposition != d { + t.Errorf("want [no-disposition/timeout] got [%v/%v]", d, err) + } + // Server ack + if err := rm.Acknowledge(Rejected); err != nil { + t.Error(err) + } + // Client get ack. + if d, err := sm.DispositionTimeout(timeout); err != nil || Rejected != d { + t.Errorf("want [rejected/nil] got [%v/%v]", d, err) + } + } + } +} + +func TestClientReceiver(t *testing.T) { + client, server := newClientServer() + nMessages := 3 + + done := make(chan struct{}) + go func() { // Server sends + defer close(done) + for { + ep, err := server.Accept() + switch { + case err == Closed: + return + case err == nil: + break + default: + t.Error(err) + return + } + ep.Open() + if s, ok := ep.(Sender); ok { + go func() { + for i := int32(0); i < int32(nMessages); i++ { + sm, err := s.Send(amqp.NewMessageWith(i)) + if err != nil { + t.Error(err) + return + } else { + sm.Disposition() // Sync send. + } + } + s.Close(nil) + }() + } + } + }() + + r, err := client.Receiver("foo") + if err != nil { + t.Fatal(err) + } + for i := int32(0); i < int32(nMessages); i++ { + rm, err := r.Receive() + if err != nil { + if err != Closed { + t.Error(err) + } + break + } + if err := rm.Accept(); err != nil { + t.Error(err) + } + if b, ok := rm.Message.Body().(int32); !ok || b != i { + t.Errorf("want %v, true got %v, %v", i, b, ok) + } + } + server.Close(nil) + <-done + client.Connection().Close(nil) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go new file mode 100644 index 0000000..ad033a6 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go @@ -0,0 +1,242 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package concurrent + +import ( + "qpid.apache.org/proton" + "qpid.apache.org/proton/amqp" + "qpid.apache.org/proton/internal" + "time" +) + +type ReceiverSettings struct { + LinkSettings + + // Capacity is the number of messages that the receiver can buffer locally. + // If unset (zero) it will be set to 1. + Capacity int + + // Prefetch==true means the Receiver will automatically issue credit to the + // remote sender to keep its buffer as full as possible, i.e. it will + // "pre-fetch" messages independently of the application calling + // Receive(). This gives good throughput for applications that handle a + // continuous stream of messages. Larger capacity may improve throughput, the + // optimal value depends on the characteristics of your application. + // + // Prefetch==false means the Receiver will issue only issue credit when you + // call Receive(), and will only issue enough credit to satisfy the calls + // actually made. This gives lower throughput but will not fetch any messages + // in advance. It is good for synchronous applications that need to evaluate + // each message before deciding whether to receive another. The + // request-response pattern is a typical example. If you make concurrent + // calls to Receive with pre-fetch disabled, you can improve performance by + // setting the capacity close to the expected number of concurrent calls. + // + Prefetch bool +} + +// Receiver is a Link that receives messages. +// +type Receiver interface { + Link + + // SetCapacity sets the Capacity and Prefetch (see ReceiverSettings) It may + // may called before Open() on an accepted receiver, it cannot be changed once + // the receiver is Open(). + SetCapacity(capacity int, prefetch bool) + + // Receive blocks until a message is available or until the Receiver is closed + // and has no more buffered messages. + Receive() (ReceivedMessage, error) + + // ReceiveTimeout is like Receive but gives up after timeout, see Timeout. + ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error) +} + +// Flow control policy for a receiver. +type policy interface { + // Called at the start of Receive() to adjust credit before fetching a message. + Pre(*receiver) + // Called after Receive() has received a message from Buffer() before it returns. + // Non-nil error means no message was received because of an error. + Post(*receiver, error) +} + +type prefetchPolicy struct{} + +func (p prefetchPolicy) Flow(r *receiver) { + r.engine().Inject(func() { + _, _, max := r.credit() + if max > 0 { + r.eLink.Flow(max) + } + }) +} +func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) } +func (p prefetchPolicy) Post(r *receiver, err error) { + if err == nil { + p.Flow(r) + } +} + +type noPrefetchPolicy struct{ waiting int } + +func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine + r.engine().Inject(func() { + len, credit, max := r.credit() + add := p.waiting - (len + credit) + if add > max { + add = max // Don't overflow + } + if add > 0 { + r.eLink.Flow(add) + } + }) +} +func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) } +func (p noPrefetchPolicy) Post(r *receiver, err error) { + p.waiting-- + if err == nil { + p.Flow(r) + } +} + +// Receiver implementation +type receiver struct { + link + // Set in Setup() + capacity int + prefetch bool + + // Set in Open() + buffer chan ReceivedMessage + policy policy +} + +func newReceiver(l link) Receiver { return &receiver{link: l} } + +func (r *receiver) SetCapacity(capacity int, prefetch bool) { + r.setPanicIfOpen() + if capacity < 1 { + capacity = 1 + } + r.capacity = capacity + r.prefetch = prefetch +} + +// Accept and open an incoming receiver. +func (r *receiver) Open() error { + if r.capacity == 0 { + r.SetCapacity(1, false) + } + if r.prefetch { + r.policy = &prefetchPolicy{} + } else { + r.policy = &noPrefetchPolicy{} + } + err := r.engine().InjectWait(func() error { + err := r.open() + if err == nil { + r.buffer = make(chan ReceivedMessage, r.capacity) + r.handler().addLink(r.eLink, r) + } + return err + }) + return r.setError(err) +} + +// call in proton goroutine +func (r *receiver) credit() (buffered, credit, capacity int) { + return len(r.buffer), r.eLink.Credit(), cap(r.buffer) +} + +func (r *receiver) Capacity() int { return cap(r.buffer) } + +func (r *receiver) Receive() (rm ReceivedMessage, err error) { + return r.ReceiveTimeout(Forever) +} + +func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) { + internal.Assert(r.buffer != nil, "Receiver is not open: %s", r) + r.policy.Pre(r) + defer func() { r.policy.Post(r, err) }() + rmi, ok, timedout := timedReceive(r.buffer, timeout) + switch { + case timedout: + return ReceivedMessage{}, Timeout + case !ok: + return ReceivedMessage{}, r.Error() + default: + return rmi.(ReceivedMessage), nil + } +} + +// Called in proton goroutine +func (r *receiver) handleDelivery(delivery proton.Delivery) { + // FIXME aconway 2015-09-24: how can this happen if we are remote closed? + if r.eLink.State().RemoteClosed() { + localClose(r.eLink, r.eLink.RemoteCondition().Error()) + return + } + if delivery.HasMessage() { + m, err := delivery.Message() + if err != nil { + localClose(r.eLink, err) + return + } + internal.Assert(m != nil) + r.eLink.Advance() + if r.eLink.Credit() < 0 { + localClose(r.eLink, internal.Errorf("received message in excess of credit limit")) + } else { + // We never issue more credit than cap(buffer) so this will not block. + r.buffer <- ReceivedMessage{m, delivery, r} + } + } +} + +func (r *receiver) closed(err error) { + r.closeError(err) + close(r.buffer) +} + +// ReceivedMessage contains an amqp.Message and allows the message to be acknowledged. +type ReceivedMessage struct { + // Message is the received message. + Message amqp.Message + + eDelivery proton.Delivery + receiver Receiver +} + +// Acknowledge a ReceivedMessage with the given disposition code. +func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error { + return rm.receiver.(*receiver).engine().InjectWait(func() error { + // Settle doesn't return an error but if the receiver is broken the settlement won't happen. + rm.eDelivery.SettleAs(uint64(disposition)) + return rm.receiver.Error() + }) +} + +// Accept is short for Acknowledge(Accpeted) +func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) } + +// Reject is short for Acknowledge(Rejected) +func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/sender.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/sender.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/sender.go new file mode 100644 index 0000000..7a65a24 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/sender.go @@ -0,0 +1,190 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package concurrent + +// #include <proton/disposition.h> +import "C" + +import ( + "qpid.apache.org/proton" + "qpid.apache.org/proton/amqp" + "qpid.apache.org/proton/internal" + "time" +) + +type SenderSettings struct { + LinkSettings +} + +// Sender is a Link that sends messages. +type Sender interface { + Link + + // Send a message asynchronously, return a SentMessage to identify it. + // + // Returns nil if the link is in Unreliable mode and no acknowledgement + // will be received. + // + // See Credit() for note on buffering. + // + // Use SentMessage.Disposition() to wait for acknowledgement. + Send(m amqp.Message) (sm SentMessage, err error) +} + +type sender struct{ link } + +func newSender(l link) Sender { return &sender{l} } + +// Open the Sender, must be called before calling Send(). +func (s *sender) Open() error { + err := s.engine().InjectWait(func() error { + err := s.open() + if err == nil { + s.handler().addLink(s.eLink, s) + } + return err + }) + return s.setError(err) +} + +// Disposition indicates the outcome of a settled message delivery. +type Disposition uint64 + +const ( + // No disposition available, not yet acknowledged or an error occurred + NoDisposition Disposition = 0 + // Message was accepted by the receiver + Accepted = proton.Accepted + // Message was rejected as invalid by the receiver + Rejected = proton.Rejected + // Message was not processed by the receiver but may be processed by some other receiver. + Released = proton.Released +) + +// String human readable name for a Disposition. +func (d Disposition) String() string { + switch d { + case NoDisposition: + return "no-disposition" + case Accepted: + return "accepted" + case Rejected: + return "rejected" + case Released: + return "released" + default: + return "unknown" + } +} + +func (s *sender) Send(m amqp.Message) (SentMessage, error) { + internal.Assert(s.IsOpen(), "sender is not open: %s", s) + if err := s.Error(); err != nil { + return nil, err + } + var sm SentMessage + err := s.engine().InjectWait(func() error { + eDelivery, err := s.eLink.Send(m) + if err == nil { + if s.eLink.SndSettleMode() == proton.SndSettled { + eDelivery.Settle() + } else { + sm = newSentMessage(s.session.connection, eDelivery) + s.session.connection.handler.sentMessages[eDelivery] = sm.(*sentMessage) + } + } + return err + }) + return sm, err +} + +func (s *sender) closed(err error) { + s.closeError(err) +} + +// SentMessage represents a previously sent message. It allows you to wait for acknowledgement. +type SentMessage interface { + // Disposition blocks till the message is acknowledged and returns the + // disposition state. NoDisposition means the Connection or the SentMessage + // was closed before the message was acknowledged. + Disposition() (Disposition, error) + + // DispositionTimeout is like Disposition but gives up after timeout, see Timeout. + DispositionTimeout(time.Duration) (Disposition, error) + + // Forget interrupts any call to Disposition on this SentMessage and tells the + // peer we are no longer interested in the disposition of this message. + Forget() + + // Error returns the error that closed the disposition, or nil if there was no error. + // If the disposition closed because the connection closed, it will return Closed. + Error() error +} + +type sentMessage struct { + connection *connection + eDelivery proton.Delivery + done chan struct{} + disposition Disposition + err error +} + +func newSentMessage(c *connection, d proton.Delivery) *sentMessage { + return &sentMessage{c, d, make(chan struct{}), NoDisposition, nil} +} + +func (sm *sentMessage) Disposition() (Disposition, error) { + <-sm.done + return sm.disposition, sm.err +} + +func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) { + if _, _, timedout := timedReceive(sm.done, timeout); timedout { + return sm.disposition, Timeout + } else { + return sm.disposition, sm.err + } +} + +func (sm *sentMessage) Forget() { + sm.connection.engine.Inject(func() { + sm.eDelivery.Settle() + delete(sm.connection.handler.sentMessages, sm.eDelivery) + }) + sm.finish() +} + +func (sm *sentMessage) settled(err error) { + if sm.eDelivery.Settled() { + sm.disposition = Disposition(sm.eDelivery.Remote().Type()) + } + sm.err = err + sm.finish() +} + +func (sm *sentMessage) finish() { + select { + case <-sm.done: // No-op if already closed + default: + close(sm.done) + } +} + +func (sm *sentMessage) Error() error { return sm.err } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go new file mode 100644 index 0000000..ba09690 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go @@ -0,0 +1,115 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package concurrent + +import ( + "qpid.apache.org/proton" + "qpid.apache.org/proton/internal" +) + +// Session is an AMQP session, it contains Senders and Receivers. +// +type Session interface { + Endpoint + Connection() Connection + + // Sender opens a new sender. v can be a string, which is used as the Target + // address, or a SenderSettings struct containing more details settings. + Sender(v interface{}) (Sender, error) + + // Receiver opens a new Receiver. v can be a string, which is used as the + // Source address, or a ReceiverSettings struct containing more details + // settings. + Receiver(v interface{}) (Receiver, error) +} + +type session struct { + endpoint + eSession proton.Session + connection *connection +} + +// in proton goroutine +func newSession(c *connection, es proton.Session) *session { + return &session{ + connection: c, + eSession: es, + endpoint: endpoint{str: es.String()}, + } +} + +func (s *session) Connection() Connection { return s.connection } +func (s *session) eEndpoint() proton.Endpoint { return s.eSession } +func (s *session) engine() *proton.Engine { return s.connection.engine } +func (s *session) Open() error { s.engine().Inject(s.eSession.Open); return nil } +func (s *session) Close(err error) { + s.engine().Inject(func() { localClose(s.eSession, err) }) +} + +// NewSender create a link sending to target. +// You must call snd.Open() before calling snd.Send(). +func (s *session) Sender(v interface{}) (snd Sender, err error) { + var settings LinkSettings + switch v := v.(type) { + case string: + settings.Target = v + case SenderSettings: + settings = v.LinkSettings + default: + internal.Assert(false, "NewSender() want string or SenderSettings, got %T", v) + } + err = s.engine().InjectWait(func() error { + l, err := makeLocalLink(s, true, settings) + snd = newSender(l) + return err + }) + if err == nil { + err = snd.Open() + } + return +} + +// Receiver opens a receiving link. +func (s *session) Receiver(v interface{}) (rcv Receiver, err error) { + var settings ReceiverSettings + switch v := v.(type) { + case string: + settings.Source = v + case ReceiverSettings: + settings = v + default: + internal.Assert(false, "NewReceiver() want string or ReceiverSettings, got %T", v) + } + err = s.engine().InjectWait(func() error { + l, err := makeLocalLink(s, false, settings.LinkSettings) + rcv = newReceiver(l) + return err + }) + rcv.SetCapacity(settings.Capacity, settings.Prefetch) + if err == nil { + err = rcv.Open() + } + return +} + +// Called from handler on closed. +func (s *session) closed(err error) { + s.closeError(err) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/time.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/time.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/time.go new file mode 100644 index 0000000..e9093d3 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/time.go @@ -0,0 +1,71 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package concurrent + +import ( + "qpid.apache.org/proton/internal" + "reflect" + "time" +) + +// Timeout is the error returned if an operation does not complete on time. +// +// Methods named *Timeout in this package take time.Duration timeout parameter. +// +// If timeout > 0 and there is no result available before the timeout, they +// return a zero or nil value and Timeout as an error. +// +// If timeout == 0 they will return a result if one is immediatley available or +// nil/zero and Timeout as an error if not. +// +// If timeout == Forever the function will return only when there is a result or +// some non-timeout error occurs. +// +var Timeout = internal.Errorf("timeout") + +// Forever can be used as a timeout parameter to indicate wait forever. +const Forever time.Duration = -1 + +// timedReceive receives on channel (which can be a chan of any type), waiting +// up to timeout. +// +// timeout==0 means do a non-blocking receive attempt. timeout < 0 means block +// forever. Other values mean block up to the timeout. +// +func timedReceive(channel interface{}, timeout time.Duration) (value interface{}, ok bool, timedout bool) { + cases := []reflect.SelectCase{ + reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)}, + } + switch { + case timeout == 0: // Non-blocking receive + cases = append(cases, reflect.SelectCase{Dir: reflect.SelectDefault}) + case timeout < 0: // Block forever, nothing to add + default: // Block up to timeout + cases = append(cases, + reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(time.After(timeout))}) + } + chosen, recv, recvOk := reflect.Select(cases) + switch { + case chosen == 0: + return recv.Interface(), recvOk, false + default: + return nil, false, true + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go new file mode 100644 index 0000000..b175cf6 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go @@ -0,0 +1,46 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +/* +Package proton provides a Go binding for the Qpid proton AMQP library. +AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/> + +Proton is an event-driven, concurrent-unsafe AMQP protocol library that allows +you to send and receive messages using the standard AMQP concurrent protocol. + +For most tasks, consider using package `qpid.apache.org/proton/concurrent`. It +provides a concurrent-safe API that is easier and more natural to use in Go. + +The raw proton API is event-driven and not concurrent-safe. You implement a +MessagingHandler event handler to react to AMQP protocol events. You must ensure +that all events are handled in a single goroutine or that you serialize all all +uses of the proton objects associated with a single connection using a lock. +You can use channels to communicate between application goroutines and the +event-handling goroutine, see type Event fro more detail. + +Package `qpid.apache.org/proton/concurrent` does all this for you and presents +a simple concurrent-safe interface. + +*/ +package proton + +// #cgo LDFLAGS: -lqpid-proton +import "C" + +// This file is just for the package comment. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go new file mode 100644 index 0000000..3096280 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go @@ -0,0 +1,402 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package proton + +// #include <proton/connection.h> +// #include <proton/event.h> +// #include <proton/handlers.h> +// #include <proton/session.h> +// #include <proton/transport.h> +// #include <memory.h> +// #include <stdlib.h> +// +// PN_HANDLE(REMOTE_ADDR) +import "C" + +import ( + "fmt" + "io" + "net" + "qpid.apache.org/proton/internal" + "sync" + "unsafe" +) + +// Injecter allows functions to be "injected" into an event-processing loop. +type Injecter interface { + // Inject a function into an event-loop concurrency context. + // + // f() will be called in the same concurrency context as event handers, so it + // can safely use values that can used be used in that context. If f blocks it + // will block the event loop so be careful calling blocking functions in f. + // + // Returns a non-nil error if the function could not be injected. + Inject(f func()) error + + // InjectWait is like Inject but does not return till f() has completed. + // If f() cannot be injected it returns the error from Inject(), otherwise + // it returns the error from f() + InjectWait(f func() error) error +} + +// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel. +type bufferChan struct { + buffers chan []byte + buf1, buf2 []byte +} + +func newBufferChan(size int) *bufferChan { + return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)} +} + +func (b *bufferChan) buffer() []byte { + b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers. + return b.buf1[:cap(b.buf1)] +} + +// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate +// Handler functions in a single event-loop goroutine. Actions taken by Handler +// functions (such as sending messages) are encoded and written to the +// net.Conn. Create a engine with NewEngine() +// +// The proton protocol engine is single threaded (per connection). The Engine runs +// proton in the goroutine that calls Engine.Run() and creates goroutines to feed +// data to/from a net.Conn. You can create multiple Engines to handle multiple +// connections concurrently. +// +// Methods on proton values defined in this package (Sessions, Links etc.) can +// only be called in the goroutine that executes the corresponding +// Engine.Run(). You implement the EventHandler or MessagingHandler interfaces +// and provide those values to NewEngine(). Their HandleEvent method will be +// called in the Engine goroutine, in typical event-driven style. +// +// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to +// other goroutines, store them, or use them as map indexes. Effectively they are +// just pointers. Other goroutines cannot call their methods directly but they can +// can create a function closure to call such methods and pass it to Engine.Inject() +// to have it evaluated in the engine goroutine. +// +// You are responsible for ensuring you don't use an event value after it is +// invalid. The handler methods will tell you when a value is no longer valid. For +// example after a LinkClosed event, that link is no longer valid. If you do +// Link.Close() yourself (in a handler or injected function) the link remains valid +// until the corresponing LinkClosed event is received by the handler. +// +// Engine.Close() will take care of cleaning up any remaining values when you are +// done with the Engine. All values associated with a engine become invalid when you +// call Engine.Close() +// +// The qpid.apache.org/proton/concurrent package will do all this for you, so it +// may be a better choice for some applications. +// +type Engine struct { + // Error is set on exit from Run() if there was an error. + err internal.FirstError + inject chan func() + + conn net.Conn + connection Connection + transport Transport + collector *C.pn_collector_t + read *bufferChan // Read buffers channel. + write *bufferChan // Write buffers channel. + handlers []EventHandler // Handlers for proton events. + running chan struct{} // This channel will be closed when the goroutines are done. + closeOnce sync.Once +} + +const bufferSize = 4096 + +// Map of Connection to *Engine +var engines = internal.MakeSafeMap() + +// NewEngine initializes a engine with a connection and handlers. To start it running: +// p := NewEngine(...) +// go run p.Run() +// The goroutine will exit when the engine is closed or disconnected. +// You can check for errors on Engine.Error. +// +func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) { + // Save the connection ID for Connection.String() + p := &Engine{ + inject: make(chan func()), + conn: conn, + transport: Transport{C.pn_transport()}, + connection: Connection{C.pn_connection()}, + collector: C.pn_collector(), + handlers: handlers, + read: newBufferChan(bufferSize), + write: newBufferChan(bufferSize), + running: make(chan struct{}), + } + if p.transport.IsNil() || p.connection.IsNil() || p.collector == nil { + return nil, internal.Errorf("failed to allocate engine") + } + + // TODO aconway 2015-06-25: connection settings for user, password, container etc. + // before transport.Bind() Set up connection before Engine, allow Engine or Reactor + // to run connection. + + // Unique container-id by default. + p.connection.SetContainer(internal.UUID4().String()) + pnErr := p.transport.Bind(p.connection) + if pnErr != 0 { + return nil, internal.Errorf("cannot setup engine: %s", internal.PnErrorCode(pnErr)) + } + C.pn_connection_collect(p.connection.pn, p.collector) + p.connection.Open() + connectionContexts.Put(p.connection, connectionContext{p, p.String()}) + return p, nil +} + +func (p *Engine) String() string { + return fmt.Sprintf("%s-%s", p.conn.LocalAddr(), p.conn.RemoteAddr()) +} + +func (p *Engine) Id() string { + return fmt.Sprintf("%p", &p) +} + +func (p *Engine) Error() error { + return p.err.Get() +} + +// Inject a function into the Engine's event loop. +// +// f() will be called in the same event-processing goroutine that calls Handler +// methods. f() can safely call methods on values that belong to this engine +// (Sessions, Links etc) +// +// The injected function has no parameters or return values. It is normally a +// closure and can use channels to communicate with the injecting goroutine if +// necessary. +// +// Returns a non-nil error if the engine is closed before the function could be +// injected. +func (p *Engine) Inject(f func()) error { + select { + case p.inject <- f: + return nil + case <-p.running: + return p.Error() + } +} + +// InjectWait is like Inject but does not return till f() has completed or the +// engine is closed, and returns an error value from f() +func (p *Engine) InjectWait(f func() error) error { + done := make(chan error) + defer close(done) + err := p.Inject(func() { done <- f() }) + if err != nil { + return err + } + select { + case <-p.running: + return p.Error() + case err := <-done: + return err + } +} + +// Server puts the Engine in server mode, meaning it will auto-detect security settings on +// the incoming connnection such as use of SASL and SSL. +// Must be called before Run() +// +func (p *Engine) Server() { p.transport.SetServer() } + +// Close the engine's connection, returns when the engine has exited. +func (p *Engine) Close(err error) { + p.Inject(func() { + if err != nil { + p.connection.Condition().SetError(err) + } + p.connection.Close() + }) + <-p.running +} + +// Disconnect the engine's connection without and AMQP close, returns when the engine has exited. +func (p *Engine) Disconnect(err error) { + if err != nil { + p.err.Set(err) + } + p.conn.Close() + <-p.running +} + +// Run the engine. Normally called in a goroutine as: go engine.Run() +// Engine.Run() will exit when the engine is closed or disconnected. +// You can check for errors after exit with Engine.Error(). +// +func (p *Engine) Run() { + // Signal errors from the read/write goroutines. Don't block if we don't + // read all the errors, we only care about the first. + error := make(chan error, 2) + wait := sync.WaitGroup{} + wait.Add(2) + + go func() { // Read goroutine + defer wait.Done() + for { + rbuf := p.read.buffer() + n, err := p.conn.Read(rbuf) + if n > 0 { + p.read.buffers <- rbuf[:n] + } else if err != nil { + close(p.read.buffers) + error <- err + return + } + } + }() + + go func() { // Write goroutine + defer wait.Done() + for { + wbuf, ok := <-p.write.buffers + if !ok { + return + } + _, err := p.conn.Write(wbuf) + if err != nil { + error <- err + return + } + } + }() + + wbuf := p.write.buffer()[:0] +loop: + for { + if len(wbuf) == 0 { + p.pop(&wbuf) + } + // Don't set wchan unless there is something to write. + var wchan chan []byte + if len(wbuf) > 0 { + wchan = p.write.buffers + } + + select { + case buf := <-p.read.buffers: // Read a buffer + p.push(buf) + case wchan <- wbuf: // Write a buffer + wbuf = p.write.buffer()[:0] + case f := <-p.inject: // Function injected from another goroutine + f() + case err := <-error: // Network read or write error + p.conn.Close() // Make sure both sides are closed + p.err.Set(err) + p.transport.CloseHead() + p.transport.CloseTail() + } + p.process() + if p.err.Get() != nil { + break loop + } + } + close(p.write.buffers) + p.conn.Close() + wait.Wait() + connectionContexts.Delete(p.connection) + if !p.connection.IsNil() { + p.connection.Free() + } + if !p.transport.IsNil() { + p.transport.Free() + } + if p.collector != nil { + C.pn_collector_free(p.collector) + } + for _, h := range p.handlers { + switch h := h.(type) { + case cHandler: + C.pn_handler_free(h.pn) + } + } + close(p.running) // Signal goroutines have exited and Error is set. +} + +func minInt(a, b int) int { + if a < b { + return a + } else { + return b + } +} + +func (p *Engine) pop(buf *[]byte) { + pending := int(p.transport.Pending()) + switch { + case pending == int(C.PN_EOS): + *buf = (*buf)[:] + return + case pending < 0: + panic(internal.Errorf("%s", internal.PnErrorCode(pending))) + } + size := minInt(pending, cap(*buf)) + *buf = (*buf)[:size] + if size == 0 { + return + } + C.memcpy(unsafe.Pointer(&(*buf)[0]), p.transport.Head(), C.size_t(size)) + internal.Assert(size > 0) + p.transport.Pop(uint(size)) +} + +func (p *Engine) push(buf []byte) { + buf2 := buf + for len(buf2) > 0 { + n := p.transport.Push(buf2) + if n <= 0 { + panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n))) + } + buf2 = buf2[n:] + } +} + +func (p *Engine) handle(e Event) (more bool) { + for _, h := range p.handlers { + h.HandleEvent(e) + } + if e.Type() == ETransportClosed { + p.err.Set(e.Connection().RemoteCondition().Error()) + p.err.Set(e.Connection().Transport().Condition().Error()) + if p.err.Get() == nil { + p.err.Set(io.EOF) + } + return false + } + return true +} + +func (p *Engine) process() (more bool) { + for ce := C.pn_collector_peek(p.collector); ce != nil; ce = C.pn_collector_peek(p.collector) { + e := makeEvent(ce) + if !p.handle(e) { + return false + } + C.pn_collector_pop(p.collector) + } + return true +} + +func (p *Engine) Connection() Connection { return p.connection } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/doc.go deleted file mode 100644 index 7c00aa0..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/doc.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/* -Package amqp encodes and decodes AMQP messages and data as Go types. - -It follows the standard 'encoding' libraries pattern. The mapping between AMQP -and Go types is described in the documentation of the Marshal and Unmarshal -functions. - -The sub-packages 'event' and 'messaging' provide two alternative ways to write -AMQP clients and servers. 'messaging' is easier for general purpose use. 'event' -gives complete low-level control of the underlying proton C engine. - -AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/> -*/ -package amqp - -// #cgo LDFLAGS: -lqpid-proton -import "C" - -// This file is just for the package comment. - -// FIXME aconway 2015-04-28: need to re-organize the package, it's not very intuitive. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop deleted file mode 120000 index cc3641d..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop +++ /dev/null @@ -1 +0,0 @@ -../../../../../../../../tests/interop \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop_test.go deleted file mode 100644 index 11049f7..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop_test.go +++ /dev/null @@ -1,308 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// Test that conversion of Go type to/from AMQP is compatible with other -// bindings. -// -package amqp - -import ( - "bytes" - "fmt" - "io" - "io/ioutil" - "os" - "reflect" - "strings" - "testing" -) - -func assertEqual(want interface{}, got interface{}) { - if !reflect.DeepEqual(want, got) { - panic(fmt.Errorf("%#v != %#v", want, got)) - } -} - -func assertNil(err interface{}) { - if err != nil { - panic(err) - } -} - -func getReader(name string) (r io.Reader) { - r, err := os.Open("interop/" + name + ".amqp") - if err != nil { - panic(fmt.Errorf("Can't open %#v: %v", name, err)) - } - return -} - -func remaining(d *Decoder) string { - remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader)) - return string(remainder) -} - -// assertDecode: want is the expected value, gotPtr is a pointer to a -// instance of the same type for Decode. -func assertDecode(d *Decoder, want interface{}, gotPtr interface{}) { - - assertNil(d.Decode(gotPtr)) - - got := reflect.ValueOf(gotPtr).Elem().Interface() - assertEqual(want, got) - - // Try round trip encoding - bytes, err := Marshal(want, nil) - assertNil(err) - n, err := Unmarshal(bytes, gotPtr) - assertNil(err) - assertEqual(n, len(bytes)) - got = reflect.ValueOf(gotPtr).Elem().Interface() - assertEqual(want, got) -} - -func TestUnmarshal(t *testing.T) { - bytes, err := ioutil.ReadAll(getReader("strings")) - if err != nil { - t.Error(err) - } - for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { - var got string - n, err := Unmarshal(bytes, &got) - if err != nil { - t.Error(err) - } - if want != got { - t.Errorf("%#v != %#v", want, got) - } - bytes = bytes[n:] - } -} - -func TestPrimitivesExact(t *testing.T) { - d := NewDecoder(getReader("primitives")) - // Decoding into exact types - var b bool - assertDecode(d, true, &b) - assertDecode(d, false, &b) - var u8 uint8 - assertDecode(d, uint8(42), &u8) - var u16 uint16 - assertDecode(d, uint16(42), &u16) - var i16 int16 - assertDecode(d, int16(-42), &i16) - var u32 uint32 - assertDecode(d, uint32(12345), &u32) - var i32 int32 - assertDecode(d, int32(-12345), &i32) - var u64 uint64 - assertDecode(d, uint64(12345), &u64) - var i64 int64 - assertDecode(d, int64(-12345), &i64) - var f32 float32 - assertDecode(d, float32(0.125), &f32) - var f64 float64 - assertDecode(d, float64(0.125), &f64) -} - -func TestPrimitivesCompatible(t *testing.T) { - d := NewDecoder(getReader("primitives")) - // Decoding into compatible types - var b bool - var i int - var u uint - var f float64 - assertDecode(d, true, &b) - assertDecode(d, false, &b) - assertDecode(d, uint(42), &u) - assertDecode(d, uint(42), &u) - assertDecode(d, -42, &i) - assertDecode(d, uint(12345), &u) - assertDecode(d, -12345, &i) - assertDecode(d, uint(12345), &u) - assertDecode(d, -12345, &i) - assertDecode(d, 0.125, &f) - assertDecode(d, 0.125, &f) -} - -// assertDecodeValue: want is the expected value, decode into a reflect.Value -func assertDecodeInterface(d *Decoder, want interface{}) { - - var got, got2 interface{} - assertNil(d.Decode(&got)) - - assertEqual(want, got) - - // Try round trip encoding - bytes, err := Marshal(got, nil) - assertNil(err) - n, err := Unmarshal(bytes, &got2) - assertNil(err) - assertEqual(n, len(bytes)) - assertEqual(want, got2) -} - -func TestPrimitivesInterface(t *testing.T) { - d := NewDecoder(getReader("primitives")) - assertDecodeInterface(d, true) - assertDecodeInterface(d, false) - assertDecodeInterface(d, uint8(42)) - assertDecodeInterface(d, uint16(42)) - assertDecodeInterface(d, int16(-42)) - assertDecodeInterface(d, uint32(12345)) - assertDecodeInterface(d, int32(-12345)) - assertDecodeInterface(d, uint64(12345)) - assertDecodeInterface(d, int64(-12345)) - assertDecodeInterface(d, float32(0.125)) - assertDecodeInterface(d, float64(0.125)) -} - -func TestStrings(t *testing.T) { - d := NewDecoder(getReader("strings")) - // Test decoding as plain Go strings - for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { - var got string - assertDecode(d, want, &got) - } - remains := remaining(d) - if remains != "" { - t.Errorf("leftover: %s", remains) - } - - // Test decoding as specific string types - d = NewDecoder(getReader("strings")) - var bytes []byte - var str, sym string - assertDecode(d, []byte("abc\000defg"), &bytes) - assertDecode(d, "abcdefg", &str) - assertDecode(d, "abcdefg", &sym) - assertDecode(d, make([]byte, 0), &bytes) - assertDecode(d, "", &str) - assertDecode(d, "", &sym) - remains = remaining(d) - if remains != "" { - t.Fatalf("leftover: %s", remains) - } - - // Test some error handling - d = NewDecoder(getReader("strings")) - var s string - err := d.Decode(s) - if err == nil { - t.Fatal("Expected error") - } - if !strings.Contains(err.Error(), "not a pointer") { - t.Error(err) - } - var i int - err = d.Decode(&i) - if !strings.Contains(err.Error(), "cannot unmarshal") { - t.Error(err) - } - _, err = Unmarshal([]byte{}, nil) - if !strings.Contains(err.Error(), "not enough data") { - t.Error(err) - } - _, err = Unmarshal([]byte("foobar"), nil) - if !strings.Contains(err.Error(), "invalid-argument") { - t.Error(err) - } -} - -func TestEncodeDecode(t *testing.T) { - type data struct { - s string - i int - u8 uint8 - b bool - f float32 - v interface{} - } - - in := data{"foo", 42, 9, true, 1.234, "thing"} - - buf := bytes.Buffer{} - e := NewEncoder(&buf) - assertNil(e.Encode(in.s)) - assertNil(e.Encode(in.i)) - assertNil(e.Encode(in.u8)) - assertNil(e.Encode(in.b)) - assertNil(e.Encode(in.f)) - assertNil(e.Encode(in.v)) - - var out data - d := NewDecoder(&buf) - assertNil(d.Decode(&out.s)) - assertNil(d.Decode(&out.i)) - assertNil(d.Decode(&out.u8)) - assertNil(d.Decode(&out.b)) - assertNil(d.Decode(&out.f)) - assertNil(d.Decode(&out.v)) - - assertEqual(in, out) -} - -func TestMap(t *testing.T) { - d := NewDecoder(getReader("maps")) - - // Generic map - var m Map - assertDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m) - - // Interface as map - var i interface{} - assertDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i) - - d = NewDecoder(getReader("maps")) - // Specific typed map - var m2 map[string]int - assertDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2) - - // Round trip a nested map - m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}} - bytes, err := Marshal(m, nil) - assertNil(err) - _, err = Unmarshal(bytes, &i) - assertNil(err) - assertEqual(m, i) -} - -func TestList(t *testing.T) { - d := NewDecoder(getReader("lists")) - var l List - assertDecode(d, List{int32(32), "foo", true}, &l) - assertDecode(d, List{}, &l) -} - -func FIXMETestMessage(t *testing.T) { - // FIXME aconway 2015-04-09: integrate Message encoding under marshal/unmarshal API. - bytes, err := ioutil.ReadAll(getReader("message")) - assertNil(err) - m, err := DecodeMessage(bytes) - assertNil(err) - fmt.Printf("%+v\n", m) - assertEqual(m.Body(), "hello") - - bytes2 := make([]byte, len(bytes)) - bytes2, err = m.Encode(bytes2) - assertNil(err) - assertEqual(bytes, bytes2) -} - -// FIXME aconway 2015-03-13: finish the full interop test http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/marshal.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/marshal.go deleted file mode 100644 index e5c2945..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/marshal.go +++ /dev/null @@ -1,238 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include <proton/codec.h> -import "C" - -import ( - "io" - "qpid.apache.org/proton/go/internal" - "reflect" - "unsafe" -) - -func dataError(prefix string, data *C.pn_data_t) error { - err := internal.PnError(unsafe.Pointer(C.pn_data_error(data))) - if err != nil { - err = internal.Errorf("%s: %s", prefix, err.(internal.Error)) - } - return err -} - -/* -Marshal encodes a Go value as AMQP data in buffer. -If buffer is nil, or is not large enough, a new buffer is created. - -Returns the buffer used for encoding with len() adjusted to the actual size of data. - -Go types are encoded as follows - - +-------------------------------------+--------------------------------------------+ - |Go type |AMQP type | - +-------------------------------------+--------------------------------------------+ - |bool |bool | - +-------------------------------------+--------------------------------------------+ - |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) | - +-------------------------------------+--------------------------------------------+ - |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) | - +-------------------------------------+--------------------------------------------+ - |float32, float64 |float, double. | - +-------------------------------------+--------------------------------------------+ - |string |string | - +-------------------------------------+--------------------------------------------+ - |[]byte, Binary |binary | - +-------------------------------------+--------------------------------------------+ - |Symbol |symbol | - +-------------------------------------+--------------------------------------------+ - |interface{} |the contained type | - +-------------------------------------+--------------------------------------------+ - |nil |null | - +-------------------------------------+--------------------------------------------+ - |map[K]T |map with K and T converted as above | - +-------------------------------------+--------------------------------------------+ - |Map |map, may have mixed types for keys, values | - +-------------------------------------+--------------------------------------------+ - |[]T |list with T converted as above | - +-------------------------------------+--------------------------------------------+ - |List |list, may have mixed types values | - +-------------------------------------+--------------------------------------------+ - -TODO Go types: array, slice, struct - -Go types that cannot be marshaled: complex64/128, uintptr, function, interface, channel -*/ -func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { - defer internal.DoRecover(&err) - data := C.pn_data(0) - defer C.pn_data_free(data) - put(data, v) - encode := func(buf []byte) ([]byte, error) { - n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf))) - switch { - case n == int(C.PN_OVERFLOW): - return buf, overflow - case n < 0: - return buf, dataError("marshal error", data) - default: - return buf[:n], nil - } - } - return encodeGrow(buffer, encode) -} - -const minEncode = 256 - -// overflow is returned when an encoding function can't fit data in the buffer. -var overflow = internal.Errorf("buffer too small") - -// encodeFn encodes into buffer[0:len(buffer)]. -// Returns buffer with length adjusted for data encoded. -// If buffer too small, returns overflow as error. -type encodeFn func(buffer []byte) ([]byte, error) - -// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer. -// Returns the final buffer. -func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) { - if buffer == nil || len(buffer) == 0 { - buffer = make([]byte, minEncode) - } - var err error - for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) { - buffer = make([]byte, 2*len(buffer)) - } - return buffer, err -} - -func put(data *C.pn_data_t, v interface{}) { - switch v := v.(type) { - case nil: - C.pn_data_put_null(data) - case bool: - C.pn_data_put_bool(data, C.bool(v)) - case int8: - C.pn_data_put_byte(data, C.int8_t(v)) - case int16: - C.pn_data_put_short(data, C.int16_t(v)) - case int32: - C.pn_data_put_int(data, C.int32_t(v)) - case int64: - C.pn_data_put_long(data, C.int64_t(v)) - case int: - if unsafe.Sizeof(0) == 8 { - C.pn_data_put_long(data, C.int64_t(v)) - } else { - C.pn_data_put_int(data, C.int32_t(v)) - } - case uint8: - C.pn_data_put_ubyte(data, C.uint8_t(v)) - case uint16: - C.pn_data_put_ushort(data, C.uint16_t(v)) - case uint32: - C.pn_data_put_uint(data, C.uint32_t(v)) - case uint64: - C.pn_data_put_ulong(data, C.uint64_t(v)) - case uint: - if unsafe.Sizeof(0) == 8 { - C.pn_data_put_ulong(data, C.uint64_t(v)) - } else { - C.pn_data_put_uint(data, C.uint32_t(v)) - } - case float32: - C.pn_data_put_float(data, C.float(v)) - case float64: - C.pn_data_put_double(data, C.double(v)) - case string: - C.pn_data_put_string(data, pnBytes([]byte(v))) - case []byte: - C.pn_data_put_binary(data, pnBytes(v)) - case Binary: - C.pn_data_put_binary(data, pnBytes([]byte(v))) - case Symbol: - C.pn_data_put_symbol(data, pnBytes([]byte(v))) - case Map: // Special map type - C.pn_data_put_map(data) - C.pn_data_enter(data) - for key, val := range v { - put(data, key) - put(data, val) - } - C.pn_data_exit(data) - default: - switch reflect.TypeOf(v).Kind() { - case reflect.Map: - putMap(data, v) - case reflect.Slice: - putList(data, v) - default: - panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) - } - } - err := dataError("marshal", data) - if err != nil { - panic(err) - } - return -} - -func putMap(data *C.pn_data_t, v interface{}) { - mapValue := reflect.ValueOf(v) - C.pn_data_put_map(data) - C.pn_data_enter(data) - for _, key := range mapValue.MapKeys() { - put(data, key.Interface()) - put(data, mapValue.MapIndex(key).Interface()) - } - C.pn_data_exit(data) -} - -func putList(data *C.pn_data_t, v interface{}) { - listValue := reflect.ValueOf(v) - C.pn_data_put_list(data) - C.pn_data_enter(data) - for i := 0; i < listValue.Len(); i++ { - put(data, listValue.Index(i).Interface()) - } - C.pn_data_exit(data) -} - -// Encoder encodes AMQP values to an io.Writer -type Encoder struct { - writer io.Writer - buffer []byte -} - -// New encoder returns a new encoder that writes to w. -func NewEncoder(w io.Writer) *Encoder { - return &Encoder{w, make([]byte, minEncode)} -} - -func (e *Encoder) Encode(v interface{}) (err error) { - e.buffer, err = Marshal(v, e.buffer) - if err == nil { - e.writer.Write(e.buffer) - } - return err -} - -func replace(data *C.pn_data_t, v interface{}) { - C.pn_data_clear(data) - put(data, v) -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message.go deleted file mode 100644 index 87093f5..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message.go +++ /dev/null @@ -1,342 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include <proton/types.h> -// #include <proton/message.h> -// #include <proton/codec.h> -import "C" - -import ( - "qpid.apache.org/proton/go/internal" - "time" - "unsafe" -) - -// FIXME aconway 2015-04-28: Do we need the interface or can we just export the struct? - -// Message is the interface to an AMQP message. -// Instances of this interface contain a pointer to the underlying struct. -type Message interface { - /** - * Inferred indicates how the message content - * is encoded into AMQP sections. If inferred is true then binary and - * list values in the body of the message will be encoded as AMQP DATA - * and AMQP SEQUENCE sections, respectively. If inferred is false, - * then all values in the body of the message will be encoded as AMQP - * VALUE sections regardless of their type. - */ - Inferred() bool - SetInferred(bool) - - /** - * Durable indicates that any parties taking responsibility - * for the message must durably store the content. - */ - Durable() bool - SetDurable(bool) - - /** - * Priority impacts ordering guarantees. Within a - * given ordered context, higher priority messages may jump ahead of - * lower priority messages. - */ - Priority() uint8 - SetPriority(uint8) - - /** - * TTL or Time To Live, a message it may be dropped after this duration - */ - TTL() time.Duration - SetTTL(time.Duration) - - /** - * FirstAcquirer indicates - * that the recipient of the message is the first recipient to acquire - * the message, i.e. there have been no failed delivery attempts to - * other acquirers. Note that this does not mean the message has not - * been delivered to, but not acquired, by other recipients. - */ - FirstAcquirer() bool - SetFirstAcquirer(bool) - - /** - * DeliveryCount tracks how many attempts have been made to - * delivery a message. - */ - DeliveryCount() uint32 - SetDeliveryCount(uint32) - - /** - * MessageId provides a unique identifier for a message. - * it can be an a string, an unsigned long, a uuid or a - * binary value. - */ - MessageId() interface{} - SetMessageId(interface{}) - - UserId() string - SetUserId(string) - - Address() string - SetAddress(string) - - Subject() string - SetSubject(string) - - ReplyTo() string - SetReplyTo(string) - - /** - * CorrelationId is set on correlated request and response messages. It can be an a string, an unsigned long, a uuid or a - * binary value. - */ - CorrelationId() interface{} - SetCorrelationId(interface{}) - - ContentType() string - SetContentType(string) - - ContentEncoding() string - SetContentEncoding(string) - - // ExpiryTime indicates an absoulte time when the message may be dropped. - // A Zero time (i.e. t.isZero() == true) indicates a message never expires. - ExpiryTime() time.Time - SetExpiryTime(time.Time) - - CreationTime() time.Time - SetCreationTime(time.Time) - - GroupId() string - SetGroupId(string) - - GroupSequence() int32 - SetGroupSequence(int32) - - ReplyToGroupId() string - SetReplyToGroupId(string) - - /** - * Instructions can be used to access or modify AMQP delivery instructions. - */ - Instructions() *map[string]interface{} - - /** - * Annotations can be used to access or modify AMQP annotations. - */ - Annotations() *map[string]interface{} - - /** - * Properties can be used to access or modify the application properties of a message. - */ - Properties() *map[string]interface{} - - /** - * Body of the message can be any AMQP encodable type. - */ - Body() interface{} - SetBody(interface{}) - - // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough - // the message is encoded into it, otherwise a new buffer is created. - // Returns the buffer containing the message. - Encode(buffer []byte) ([]byte, error) -} - -// NewMessage creates a new message instance. The returned interface contains a pointer. -func NewMessage() Message { - pn := C.pn_message() // Pick up default setting from C message. - defer C.pn_message_free(pn) - return goMessage(pn) -} - -// Message implementation copies all message data into Go space so it can be proprely -// memory managed. -// -type message struct { - inferred, durable, firstAcquirer bool - priority uint8 - ttl time.Duration - deliveryCount uint32 - messageId interface{} - userId, address, subject, replyTo string - contentType, contentEncoding string - groupId, replyToGroupId string - creationTime, expiryTime time.Time - groupSequence int32 - correlationId interface{} - instructions, annotations, properties map[string]interface{} - body interface{} -} - -func (m *message) Inferred() bool { return m.inferred } -func (m *message) SetInferred(b bool) { m.inferred = b } -func (m *message) Durable() bool { return m.durable } -func (m *message) SetDurable(b bool) { m.durable = b } -func (m *message) Priority() uint8 { return m.priority } -func (m *message) SetPriority(b uint8) { m.priority = b } -func (m *message) TTL() time.Duration { return m.ttl } -func (m *message) SetTTL(d time.Duration) { m.ttl = d } -func (m *message) FirstAcquirer() bool { return m.firstAcquirer } -func (m *message) SetFirstAcquirer(b bool) { m.firstAcquirer = b } -func (m *message) DeliveryCount() uint32 { return m.deliveryCount } -func (m *message) SetDeliveryCount(c uint32) { m.deliveryCount = c } -func (m *message) MessageId() interface{} { return m.messageId } -func (m *message) SetMessageId(id interface{}) { m.messageId = id } -func (m *message) UserId() string { return m.userId } -func (m *message) SetUserId(s string) { m.userId = s } -func (m *message) Address() string { return m.address } -func (m *message) SetAddress(s string) { m.address = s } -func (m *message) Subject() string { return m.subject } -func (m *message) SetSubject(s string) { m.subject = s } -func (m *message) ReplyTo() string { return m.replyTo } -func (m *message) SetReplyTo(s string) { m.replyTo = s } -func (m *message) CorrelationId() interface{} { return m.correlationId } -func (m *message) SetCorrelationId(c interface{}) { m.correlationId = c } -func (m *message) ContentType() string { return m.contentType } -func (m *message) SetContentType(s string) { m.contentType = s } -func (m *message) ContentEncoding() string { return m.contentEncoding } -func (m *message) SetContentEncoding(s string) { m.contentEncoding = s } -func (m *message) ExpiryTime() time.Time { return m.expiryTime } -func (m *message) SetExpiryTime(t time.Time) { m.expiryTime = t } -func (m *message) CreationTime() time.Time { return m.creationTime } -func (m *message) SetCreationTime(t time.Time) { m.creationTime = t } -func (m *message) GroupId() string { return m.groupId } -func (m *message) SetGroupId(s string) { m.groupId = s } -func (m *message) GroupSequence() int32 { return m.groupSequence } -func (m *message) SetGroupSequence(s int32) { m.groupSequence = s } -func (m *message) ReplyToGroupId() string { return m.replyToGroupId } -func (m *message) SetReplyToGroupId(s string) { m.replyToGroupId = s } -func (m *message) Instructions() *map[string]interface{} { return &m.instructions } -func (m *message) Annotations() *map[string]interface{} { return &m.annotations } -func (m *message) Properties() *map[string]interface{} { return &m.properties } -func (m *message) Body() interface{} { return m.body } -func (m *message) SetBody(b interface{}) { m.body = b } - -// rewindGet rewinds and then gets the value from a data object. -func rewindGet(data *C.pn_data_t, v interface{}) { - if data != nil && C.pn_data_size(data) > 0 { - C.pn_data_rewind(data) - C.pn_data_next(data) - get(data, v) - } -} - -// goMessage populates a Go message from a pn_message_t -func goMessage(pn *C.pn_message_t) *message { - m := &message{ - inferred: bool(C.pn_message_is_inferred(pn)), - durable: bool(C.pn_message_is_durable(pn)), - priority: uint8(C.pn_message_get_priority(pn)), - ttl: time.Duration(C.pn_message_get_ttl(pn)) * time.Millisecond, - firstAcquirer: bool(C.pn_message_is_first_acquirer(pn)), - deliveryCount: uint32(C.pn_message_get_delivery_count(pn)), - userId: goString(C.pn_message_get_user_id(pn)), - address: C.GoString(C.pn_message_get_address(pn)), - subject: C.GoString(C.pn_message_get_subject(pn)), - replyTo: C.GoString(C.pn_message_get_reply_to(pn)), - contentType: C.GoString(C.pn_message_get_content_type(pn)), - contentEncoding: C.GoString(C.pn_message_get_content_encoding(pn)), - expiryTime: time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(pn)))), - creationTime: time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(pn))), - groupId: C.GoString(C.pn_message_get_group_id(pn)), - groupSequence: int32(C.pn_message_get_group_sequence(pn)), - replyToGroupId: C.GoString(C.pn_message_get_reply_to_group_id(pn)), - messageId: nil, - correlationId: nil, - instructions: make(map[string]interface{}), - annotations: make(map[string]interface{}), - properties: make(map[string]interface{}), - } - rewindGet(C.pn_message_id(pn), &m.messageId) - rewindGet(C.pn_message_correlation_id(pn), &m.correlationId) - rewindGet(C.pn_message_instructions(pn), &m.instructions) - rewindGet(C.pn_message_annotations(pn), &m.annotations) - rewindGet(C.pn_message_properties(pn), &m.properties) - rewindGet(C.pn_message_body(pn), &m.body) - return m -} - -// pnMessage populates a pn_message_t from a Go message. -func (m *message) pnMessage() *C.pn_message_t { - pn := C.pn_message() - C.pn_message_set_inferred(pn, C.bool(m.Inferred())) - C.pn_message_set_durable(pn, C.bool(m.Durable())) - C.pn_message_set_priority(pn, C.uint8_t(m.priority)) - C.pn_message_set_ttl(pn, C.pn_millis_t(m.TTL()/time.Millisecond)) - C.pn_message_set_first_acquirer(pn, C.bool(m.FirstAcquirer())) - C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount)) - replace(C.pn_message_id(pn), m.MessageId()) - C.pn_message_set_user_id(pn, pnBytes([]byte(m.UserId()))) - C.pn_message_set_address(pn, C.CString(m.Address())) - C.pn_message_set_subject(pn, C.CString(m.Subject())) - C.pn_message_set_reply_to(pn, C.CString(m.ReplyTo())) - replace(C.pn_message_correlation_id(pn), m.CorrelationId()) - C.pn_message_set_content_type(pn, C.CString(m.ContentType())) - C.pn_message_set_content_encoding(pn, C.CString(m.ContentEncoding())) - C.pn_message_set_expiry_time(pn, pnTime(m.ExpiryTime())) - C.pn_message_set_creation_time(pn, pnTime(m.CreationTime())) - C.pn_message_set_group_id(pn, C.CString(m.GroupId())) - C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.GroupSequence())) - C.pn_message_set_reply_to_group_id(pn, C.CString(m.ReplyToGroupId())) - replace(C.pn_message_instructions(pn), *m.Instructions()) - replace(C.pn_message_annotations(pn), *m.Annotations()) - replace(C.pn_message_properties(pn), *m.Properties()) - replace(C.pn_message_body(pn), m.Body()) - return pn -} - -// FIXME aconway 2015-04-08: Move message encode/decode under Marshal/Unmarshal interfaces. - -// DecodeMessage decodes bytes as a message -func DecodeMessage(data []byte) (Message, error) { - pnMsg := C.pn_message() - defer C.pn_message_free(pnMsg) - if len(data) == 0 { - return nil, internal.Errorf("empty buffer for decode") - } - if C.pn_message_decode(pnMsg, cPtr(data), cLen(data)) < 0 { - return nil, internal.Errorf("decoding message: %s", - internal.PnError(unsafe.Pointer(C.pn_message_error(pnMsg)))) - } - return goMessage(pnMsg), nil -} - -// Encode the message into bufffer. -// If buffer is nil or len(buffer) is not sufficient to encode the message a larger -// buffer will be returned. -func (m *message) Encode(buffer []byte) ([]byte, error) { - pn := m.pnMessage() - defer C.pn_message_free(pn) - encode := func(buf []byte) ([]byte, error) { - len := cLen(buf) - result := C.pn_message_encode(pn, cPtr(buf), &len) - switch { - case result == C.PN_OVERFLOW: - return buf, overflow - case result < 0: - return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result)) - default: - return buf[:len], nil - } - } - return encodeGrow(buffer, encode) -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message_test.go deleted file mode 100644 index 46e26de..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message_test.go +++ /dev/null @@ -1,90 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -import ( - "reflect" - "testing" - "time" -) - -func roundTrip(t *testing.T, m Message) { - buffer, err := m.Encode(nil) - if err != nil { - t.Fatalf("Encode failed: %v", err) - } - m2, err := DecodeMessage(buffer) - if err != nil { - t.Fatalf("Decode failed: %v", err) - } - if !reflect.DeepEqual(m, m2) { - t.Errorf("Message mismatch got\n%#v\nwant\n%#v", m, m2) - } -} - -func TestDefaultMessageRoundTrip(t *testing.T) { - m := NewMessage() - // Check defaults - assertEqual(m.Inferred(), false) - assertEqual(m.Durable(), false) - assertEqual(m.Priority(), uint8(4)) - assertEqual(m.TTL(), time.Duration(0)) - assertEqual(m.UserId(), "") - assertEqual(m.Address(), "") - assertEqual(m.Subject(), "") - assertEqual(m.ReplyTo(), "") - assertEqual(m.ContentType(), "") - assertEqual(m.ContentEncoding(), "") - assertEqual(m.GroupId(), "") - assertEqual(m.GroupSequence(), int32(0)) - assertEqual(m.ReplyToGroupId(), "") - assertEqual(m.MessageId(), nil) - assertEqual(m.CorrelationId(), nil) - assertEqual(*m.Instructions(), map[string]interface{}{}) - assertEqual(*m.Annotations(), map[string]interface{}{}) - assertEqual(*m.Properties(), map[string]interface{}{}) - assertEqual(m.Body(), nil) - - roundTrip(t, m) -} - -func TestMessageRoundTrip(t *testing.T) { - m := NewMessage() - m.SetInferred(false) - m.SetDurable(true) - m.SetPriority(42) - m.SetTTL(0) - m.SetUserId("user") - m.SetAddress("address") - m.SetSubject("subject") - m.SetReplyTo("replyto") - m.SetContentType("content") - m.SetContentEncoding("encoding") - m.SetGroupId("group") - m.SetGroupSequence(42) - m.SetReplyToGroupId("replytogroup") - m.SetMessageId("id") - m.SetCorrelationId("correlation") - *m.Instructions() = map[string]interface{}{"instructions": "foo"} - *m.Annotations() = map[string]interface{}{"annotations": "foo"} - *m.Properties() = map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"} - m.SetBody("hello") - roundTrip(t, m) -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
