http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc3e0e5/proton-c/bindings/go/messaging/messaging.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/messaging/messaging.go b/proton-c/bindings/go/messaging/messaging.go new file mode 100644 index 0000000..e653de2 --- /dev/null +++ b/proton-c/bindings/go/messaging/messaging.go @@ -0,0 +1,250 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package messaging + +// #include <proton/disposition.h> +import "C" + +import ( + "net" + "qpid.apache.org/proton/go/amqp" + "qpid.apache.org/proton/go/event" +) + +// Connection is a connection to a remote AMQP endpoint. +// +// You can set exported fields to configure the connection before calling +// Connection.Open() +// +type Connection struct { + // Server = true means a the connection will do automatic protocol detection. + Server bool + + // FIXME aconway 2015-04-17: Other parameters to set up SSL, SASL etc. + + handler *handler + pump *event.Pump + session Session +} + +// Make an AMQP connection over a net.Conn connection. +// +// Use Connection.Close() to close the Connection, this will also close conn. +// Using conn.Close() directly will cause an abrupt disconnect rather than an +// orderly AMQP close. +// +func (c *Connection) Open(conn net.Conn) (err error) { + c.handler = newHandler(c) + c.pump, err = event.NewPump(conn, + event.NewMessagingDelegator(c.handler), + ) + if err != nil { + return err + } + if c.Server { + c.pump.Server() + } + go c.pump.Run() + return nil +} + +// Connect opens a default client connection. It is a shortcut for +// c := &Connection +// c.Open() +// +func Connect(conn net.Conn) (*Connection, error) { + c := &Connection{} + err := c.Open(conn) + return c, err +} + +// Close the connection. +// +// Connections must be closed to clean up resources and stop associated goroutines. +func (c *Connection) Close() error { return c.pump.Close() } + +// DefaultSession returns a default session for the connection. +// +// It is created on the first call to DefaultSession() and returned from all subsequent calls. +// Use Session() for more control over creating sessions. +// +func (c *Connection) DefaultSession() (s Session, err error) { + if c.session.e.IsNil() { + c.session, err = c.Session() + } + return c.session, err +} + +type sessionErr struct { + s event.Session + err error +} + +// Session creates a new session. +func (c *Connection) Session() (Session, error) { + connection := c.pump.Connection() + result := make(chan sessionErr) + c.pump.Inject <- func() { + s, err := connection.Session() + if err == nil { + s.Open() + } + result <- sessionErr{s, err} + } + se := <-result + return Session{se.s, c.pump}, se.err +} + +// FIXME aconway 2015-04-27: set sender name, options etc. + +// Sender creates a Sender that will send messages to the address addr. +func (c *Connection) Sender(addr string) (s Sender, err error) { + session, err := c.DefaultSession() + if err != nil { + return Sender{}, err + } + result := make(chan Sender) + c.pump.Inject <- func() { + link := session.e.Sender(linkNames.Next()) + if link.IsNil() { + err = session.e.Error() + } else { + link.Target().SetAddress(addr) + // FIXME aconway 2015-04-27: link options? + link.Open() + } + result <- Sender{Link{c, link}} + } + return <-result, err +} + +// Receiver returns a receiver that will receive messages sent to address addr. +func (c *Connection) Receiver(addr string) (r Receiver, err error) { + // FIXME aconway 2015-04-29: move code to session, in link.go? + session, err := c.DefaultSession() + if err != nil { + return Receiver{}, err + } + result := make(chan Receiver) + c.pump.Inject <- func() { + link := session.e.Receiver(linkNames.Next()) + if link.IsNil() { + err = session.e.Error() + } else { + link.Source().SetAddress(addr) + // FIXME aconway 2015-04-27: link options? + link.Open() + } + // FIXME aconway 2015-04-29: hack to avoid blocking, need proper buffering linked to flow control + rchan := make(chan amqp.Message, 1000) + c.handler.receivers[link] = rchan + result <- Receiver{Link{c, link}, rchan} + } + return <-result, err +} + +// FIXME aconway 2015-04-29: counter per session. +var linkNames amqp.UidCounter + +// Session is an AMQP session, it contains Senders and Receivers. +// Every Connection has a DefaultSession, you can create additional sessions +// with Connection.Session() +type Session struct { + e event.Session + pump *event.Pump +} + +// FIXME aconway 2015-05-05: REWORK Sender/receiver/session. + +// Disposition indicates the outcome of a settled message delivery. +type Disposition uint64 + +const ( + // Message was accepted by the receiver + Accepted Disposition = C.PN_ACCEPTED + // Message was rejected as invalid by the receiver + Rejected = C.PN_REJECTED + // Message was not processed by the receiver but may be processed by some other receiver. + Released = C.PN_RELEASED +) + +// String human readable name for a Disposition. +func (d Disposition) String() string { + switch d { + case Accepted: + return "Accepted" + case Rejected: + return "Rejected" + case Released: + return "Released" + default: + return "Unknown" + } +} + +// FIXME aconway 2015-04-29: How to signal errors via ack channels. + +// An Acknowledgement is a channel which will receive the Disposition of the message +// when it is acknowledged. The channel is closed after the disposition is sent. +type Acknowledgement <-chan Disposition + +// Link has common data and methods for Sender and Receiver links. +type Link struct { + connection *Connection + elink event.Link +} + +// Sender sends messages. +type Sender struct { + Link +} + +// FIXME aconway 2015-04-28: allow user to specify delivery tag. +// FIXME aconway 2015-04-28: should we provide a sending channel rather than a send function? + +// Send sends a message. If d is not nil, the disposition is retured on d. +// If d is nil the message is sent pre-settled and no disposition is returned. +func (s *Sender) Send(m amqp.Message) (ack Acknowledgement, err error) { + ackChan := make(chan Disposition, 1) + ack = ackChan + s.connection.pump.Inject <- func() { + // FIXME aconway 2015-04-28: flow control & credit, buffer or fail? + delivery, err := s.elink.Send(m) + if err == nil { // FIXME aconway 2015-04-28: error handling + s.connection.handler.acks[delivery] = ackChan + } + } + return ack, nil +} + +// Close the sender. +func (s *Sender) Close() error { return nil } // FIXME aconway 2015-04-27: close/free + +// Receiver receives messages via the channel Receive. +type Receiver struct { + Link + // Channel of messag + Receive <-chan amqp.Message +} + +// FIXME aconway 2015-04-29: settlement - ReceivedMessage with Settle() method? + +// Close the Receiver. +func (r *Receiver) Close() error { return nil } // FIXME aconway 2015-04-29: close/free
--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
