http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/engine.go ---------------------------------------------------------------------- diff --cc proton/engine.go index c0f0093,0000000..d28a09f mode 100644,000000..100644 --- a/proton/engine.go +++ b/proton/engine.go @@@ -1,422 -1,0 +1,422 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package proton + +import ( + "fmt" + "net" + "os" + "strings" + "sync" + "time" + "unsafe" +) + +/* +#include <proton/connection.h> +#include <proton/event.h> +#include <proton/error.h> +#include <proton/handlers.h> +#include <proton/session.h> +#include <proton/transport.h> +#include <memory.h> +#include <stdlib.h> +*/ +import "C" + +// Injecter allows functions to be "injected" into the event-processing loop, to +// be called in the same goroutine as event handlers. +type Injecter interface { + // Inject a function into the engine goroutine. + // + // f() will be called in the same goroutine as event handlers, so it can safely + // use values belonging to event handlers without synchronization. f() should + // not block, no further events or injected functions can be processed until + // f() returns. + // + // Returns a non-nil error if the function could not be injected and will + // never be called. Otherwise the function will eventually be called. + // + // Note that proton values (Link, Session, Connection etc.) that existed when + // Inject(f) was called may have become invalid by the time f() is executed. + // Handlers should handle keep track of Closed events to ensure proton values + // are not used after they become invalid. One technique is to have map from + // proton values to application values. Check that the map has the correct + // proton/application value pair at the start of the injected function and + // delete the value from the map when handling a Closed event. + Inject(f func()) error + + // InjectWait is like Inject but does not return till f() has completed. + // If f() cannot be injected it returns the error from Inject(), otherwise + // it returns the error from f() + InjectWait(f func() error) error +} + +// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate +// Handler functions sequentially in a single goroutine. Actions taken by +// Handler functions (such as sending messages) are encoded and written to the +// net.Conn. You can create multiple Engines to handle multiple connections +// concurrently. +// +// You implement the EventHandler and/or MessagingHandler interfaces and provide +// those values to NewEngine(). Their HandleEvent method will be called in the +// event-handling goroutine. +// +// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to +// other goroutines, store them, or use them as map indexes. Effectively they are +// just pointers. Other goroutines cannot call their methods directly but they can +// can create a function closure to call such methods and pass it to Engine.Inject() +// to have it evaluated in the engine goroutine. +// +// You are responsible for ensuring you don't use an event value after it is +// invalid. The handler methods will tell you when a value is no longer valid. For +// example after a LinkClosed event, that link is no longer valid. If you do +// Link.Close() yourself (in a handler or injected function) the link remains valid - // until the corresponing LinkClosed event is received by the handler. ++// until the corresponding LinkClosed event is received by the handler. +// +// Engine.Close() will take care of cleaning up any remaining values when you are +// done with the Engine. All values associated with a engine become invalid when you +// call Engine.Close() +// +// The qpid.apache.org/proton/concurrent package will do all this for you, so it +// may be a better choice for some applications. +// +type Engine struct { + // Error is set on exit from Run() if there was an error. + err ErrorHolder + inject chan func() + + conn net.Conn + connection Connection + transport Transport + collector *C.pn_collector_t + handlers []EventHandler // Handlers for proton events. + running chan struct{} // This channel will be closed when the goroutines are done. + closeOnce sync.Once + timer *time.Timer + traceEvent bool +} + +const bufferSize = 4096 + +func envBool(name string) bool { + v := strings.ToLower(os.Getenv(name)) + return v == "true" || v == "1" || v == "yes" || v == "on" +} + +// Create a new Engine and call Initialize() with conn and handlers +func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) { + eng := &Engine{} + return eng, eng.Initialize(conn, handlers...) +} + +// Initialize an Engine with a connection and handlers. Start it with Run() +func (eng *Engine) Initialize(conn net.Conn, handlers ...EventHandler) error { + eng.inject = make(chan func()) + eng.conn = conn + eng.connection = Connection{C.pn_connection()} + eng.transport = Transport{C.pn_transport()} + eng.collector = C.pn_collector() + eng.handlers = handlers + eng.running = make(chan struct{}) + eng.timer = time.NewTimer(0) + eng.traceEvent = envBool("PN_TRACE_EVT") + if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil { + eng.free() + return fmt.Errorf("proton.NewEngine cannot allocate") + } + C.pn_connection_collect(eng.connection.pn, eng.collector) + return nil +} + +// Create a byte slice backed by C memory. +// Empty or error (size <= 0) returns a nil byte slice. +func cByteSlice(start unsafe.Pointer, size int) []byte { + if start == nil || size <= 0 { + return nil + } else { + // Slice from very large imaginary array in C memory + return (*[1 << 30]byte)(start)[:size:size] + } +} + +func (eng *Engine) Connection() Connection { + return eng.connection +} + +func (eng *Engine) Transport() Transport { + return eng.transport +} + +func (eng *Engine) String() string { + return fmt.Sprintf("[%s]%s-%s", eng.Id(), eng.conn.LocalAddr(), eng.conn.RemoteAddr()) +} + +func (eng *Engine) Id() string { + // Use transport address to match default PN_TRACE_FRM=1 output. + return fmt.Sprintf("%p", eng.Transport().CPtr()) +} + +func (eng *Engine) Error() error { + return eng.err.Get() +} + +// Inject a function into the Engine's event loop. +// +// f() will be called in the same event-processing goroutine that calls Handler +// methods. f() can safely call methods on values that belong to this engine +// (Sessions, Links etc) +// +// The injected function has no parameters or return values. It is normally a +// closure and can use channels to communicate with the injecting goroutine if +// necessary. +// +// Returns a non-nil error if the engine is closed before the function could be +// injected. +func (eng *Engine) Inject(f func()) error { + select { + case eng.inject <- f: + return nil + case <-eng.running: + return eng.Error() + } +} + +// InjectWait is like Inject but does not return till f() has completed or the +// engine is closed, and returns an error value from f() +func (eng *Engine) InjectWait(f func() error) error { + done := make(chan error) + defer close(done) + err := eng.Inject(func() { done <- f() }) + if err != nil { + return err + } + select { + case <-eng.running: + return eng.Error() + case err := <-done: + return err + } +} + +// Server puts the Engine in server mode, meaning it will auto-detect security settings on - // the incoming connnection such as use of SASL and SSL. ++// the incoming connection such as use of SASL and SSL. +// Must be called before Run() +// +func (eng *Engine) Server() { eng.Transport().SetServer() } + +func (eng *Engine) disconnect(err error) { + cond := eng.Transport().Condition() + cond.SetError(err) // Set the provided error. + cond.SetError(eng.conn.Close()) // Use connection error if cond is not already set. + eng.transport.CloseTail() + eng.transport.CloseHead() +} + +// Close the engine's connection. +// If err != nil pass it to the remote end as the close condition. +// Returns when the remote end closes or disconnects. +func (eng *Engine) Close(err error) { + _ = eng.Inject(func() { CloseError(eng.Connection(), err) }) + <-eng.running +} + +// CloseTimeout like Close but disconnect if the remote end doesn't close within timeout. +func (eng *Engine) CloseTimeout(err error, timeout time.Duration) { + _ = eng.Inject(func() { CloseError(eng.Connection(), err) }) + select { + case <-eng.running: + case <-time.After(timeout): + eng.Disconnect(err) + } +} + +// Disconnect the engine's connection immediately without an AMQP close. +// Process any termination events before returning. +func (eng *Engine) Disconnect(err error) { + _ = eng.Inject(func() { eng.disconnect(err) }) + <-eng.running +} + +// Let proton run timed activity and set up the next tick +func (eng *Engine) tick() { + now := time.Now() + next := eng.Transport().Tick(now) + if !next.IsZero() { + eng.timer.Reset(next.Sub(now)) + } +} + +func (eng *Engine) dispatch() bool { + for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) { + e := makeEvent(ce, eng) + if eng.traceEvent { + eng.transport.Log(e.String()) + } + for _, h := range eng.handlers { + h.HandleEvent(e) + } + if e.Type() == EConnectionRemoteOpen { + eng.tick() // Update the tick if changed by remote. + } + C.pn_collector_pop(eng.collector) + } + return !eng.transport.Closed() || C.pn_collector_peek(eng.collector) != nil +} + +func (eng *Engine) writeBuffer() []byte { + size := eng.Transport().Pending() // Evaluate before Head(), may change buffer. + start := eng.Transport().Head() + return cByteSlice(start, size) +} + +func (eng *Engine) readBuffer() []byte { + size := eng.Transport().Capacity() + start := eng.Transport().Tail() + return cByteSlice(start, size) +} + +func (eng *Engine) free() { + if !eng.transport.IsNil() { + eng.transport.Unbind() + eng.transport.Free() + eng.transport = Transport{} + } + if !eng.connection.IsNil() { + eng.connection.Free() + eng.connection = Connection{} + } + if eng.collector != nil { + C.pn_collector_release(eng.collector) + C.pn_collector_free(eng.collector) + eng.collector = nil + } +} + +// Run the engine. Engine.Run() will exit when the engine is closed or +// disconnected. You can check for errors after exit with Engine.Error(). +// +func (eng *Engine) Run() error { + defer eng.free() + eng.transport.Bind(eng.connection) + eng.tick() // Start ticking if needed + + // Channels for read and write buffers going in and out of the read/write goroutines. - // The channels are unbuffered: we want to exchange buffers in seuquence. ++ // The channels are unbuffered: we want to exchange buffers in sequence. + readsIn, writesIn := make(chan []byte), make(chan []byte) + readsOut, writesOut := make(chan []byte), make(chan []byte) + + wait := sync.WaitGroup{} + wait.Add(2) // Read and write goroutines + + go func() { // Read goroutine + defer wait.Done() + for { + rbuf, ok := <-readsIn + if !ok { + return + } + n, err := eng.conn.Read(rbuf) + if n > 0 { + readsOut <- rbuf[:n] + } else if err != nil { + _ = eng.Inject(func() { + eng.Transport().Condition().SetError(err) + eng.Transport().CloseTail() + }) + return + } + } + }() + + go func() { // Write goroutine + defer wait.Done() + for { + wbuf, ok := <-writesIn + if !ok { + return + } + n, err := eng.conn.Write(wbuf) + if n > 0 { + writesOut <- wbuf[:n] + } else if err != nil { + _ = eng.Inject(func() { + eng.Transport().Condition().SetError(err) + eng.Transport().CloseHead() + }) + return + } + } + }() + + for eng.dispatch() { + readBuf := eng.readBuffer() + writeBuf := eng.writeBuffer() + // Note that getting the buffers can generate events (eg. SASL events) that + // might close the transport. Check if we are already finished before + // blocking for IO. + if !eng.dispatch() { + break + } + + // sendReads/sendWrites are nil (not sendable in select) unless we have a + // buffer to read/write + var sendReads, sendWrites chan []byte + if readBuf != nil { + sendReads = readsIn + } + if writeBuf != nil { + sendWrites = writesIn + } + + // Send buffers to the read/write goroutines if we have them. + // Get buffers from the read/write goroutines and process them + // Check for injected functions + select { + + case sendReads <- readBuf: + + case sendWrites <- writeBuf: + + case buf := <-readsOut: + eng.transport.Process(uint(len(buf))) + + case buf := <-writesOut: + eng.transport.Pop(uint(len(buf))) + + case f, ok := <-eng.inject: // Function injected from another goroutine + if ok { + f() + } + + case <-eng.timer.C: + eng.tick() + } + } + + eng.err.Set(EndpointError(eng.Connection())) + eng.err.Set(eng.Transport().Condition().Error()) + close(readsIn) + close(writesIn) + close(eng.running) // Signal goroutines have exited and Error is set, disable Inject() + _ = eng.conn.Close() // Close conn, force read/write goroutines to exit (they will Inject) + wait.Wait() // Wait for goroutines + return eng.err.Get() +}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/handlers.go ---------------------------------------------------------------------- diff --cc proton/handlers.go index 961136e,0000000..f101548 mode 100644,000000..100644 --- a/proton/handlers.go +++ b/proton/handlers.go @@@ -1,395 -1,0 +1,395 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package proton + +import "fmt" + +// EventHandler handles core proton events. +type EventHandler interface { + // HandleEvent is called with an event. + // Typically HandleEvent() is implemented as a switch on e.Type() + // Returning an error will stop the Engine. + HandleEvent(e Event) +} + +// MessagingHandler provides an alternative interface to EventHandler. +// it is easier to use for most applications that send and receive messages. +// +// Implement this interface and then wrap your value with a MessagingHandlerDelegator. +// MessagingHandlerDelegator implements EventHandler and can be registered with a Engine. +// +type MessagingHandler interface { + // HandleMessagingEvent is called with MessagingEvent. + // Typically HandleEvent() is implemented as a switch on e.Type() + // Returning an error will stop the Engine. + HandleMessagingEvent(MessagingEvent, Event) +} + +// MessagingEvent provides a set of events that are easier to work with than the +// core events defined by EventType +// +// There are 3 types of "endpoint": Connection, Session and Link. For each +// endpoint there are 5 events: Opening, Opened, Closing, Closed and Error. +// +// The meaning of these events is as follows: +// +// Opening: The remote end opened, the local end will open automatically. +// +// Opened: Both ends are open, regardless of which end opened first. +// +// Closing: The remote end closed without error, the local end will close automatically. +// +// Error: The remote end closed with an error, the local end will close automatically. +// +// Closed: Both ends are closed, regardless of which end closed first or if there was an error. +// No further events will be received for the endpoint. +// +type MessagingEvent int + +const ( + // The event loop starts. + MStart MessagingEvent = iota + // The peer closes the connection with an error condition. + MConnectionError + // The peer closes the session with an error condition. + MSessionError + // The peer closes the link with an error condition. + MLinkError + // The peer Initiates the opening of the connection. + MConnectionOpening + // The peer initiates the opening of the session. + MSessionOpening + // The peer initiates the opening of the link. + MLinkOpening + // The connection is opened. + MConnectionOpened + // The session is opened. + MSessionOpened + // The link is opened. + MLinkOpened + // The peer initiates the closing of the connection. + MConnectionClosing + // The peer initiates the closing of the session. + MSessionClosing + // The peer initiates the closing of the link. + MLinkClosing + // Both ends of the connection are closed. + MConnectionClosed + // Both ends of the session are closed. + MSessionClosed + // Both ends of the link are closed. + MLinkClosed + // The sender link has credit and messages can + // therefore be transferred. + MSendable + // The remote peer accepts an outgoing message. + MAccepted + // The remote peer rejects an outgoing message. + MRejected + // The peer releases an outgoing message. Note that this may be in response to + // either the RELEASE or MODIFIED state as defined by the AMQP specification. + MReleased + // The peer has settled the outgoing message. This is the point at which it + // should never be re-transmitted. + MSettled + // A message is received. Call Event.Delivery().Message() to decode as an amqp.Message. + // To manage the outcome of this messages (e.g. to accept or reject the message) + // use Event.Delivery(). + MMessage + // A network connection was disconnected. + MDisconnected +) + +func (t MessagingEvent) String() string { + switch t { + case MStart: + return "Start" + case MConnectionError: + return "ConnectionError" + case MSessionError: + return "SessionError" + case MLinkError: + return "LinkError" + case MConnectionOpening: + return "ConnectionOpening" + case MSessionOpening: + return "SessionOpening" + case MLinkOpening: + return "LinkOpening" + case MConnectionOpened: + return "ConnectionOpened" + case MSessionOpened: + return "SessionOpened" + case MLinkOpened: + return "LinkOpened" + case MConnectionClosing: + return "ConnectionClosing" + case MSessionClosing: + return "SessionClosing" + case MLinkClosing: + return "LinkClosing" + case MConnectionClosed: + return "ConnectionClosed" + case MSessionClosed: + return "SessionClosed" + case MLinkClosed: + return "LinkClosed" + case MDisconnected: + return "Disconnected" + case MSendable: + return "Sendable" + case MAccepted: + return "Accepted" + case MRejected: + return "Rejected" + case MReleased: + return "Released" + case MSettled: + return "Settled" + case MMessage: + return "Message" + default: + return "Unknown" + } +} + +// ResourceHandler provides a simple way to track the creation and deletion of +// various proton objects. +// endpointDelegator captures common patterns for endpoints opening/closing +type endpointDelegator struct { + remoteOpen, remoteClose, localOpen, localClose EventType + opening, opened, closing, closed, error MessagingEvent + endpoint func(Event) Endpoint + delegator *MessagingAdapter +} + +// HandleEvent handles an open/close event for an endpoint in a generic way. +func (d endpointDelegator) HandleEvent(e Event) { + endpoint := d.endpoint(e) + state := endpoint.State() + + switch e.Type() { + + case d.localOpen: + if state.RemoteActive() { + d.delegator.mhandler.HandleMessagingEvent(d.opened, e) + } + + case d.remoteOpen: + d.delegator.mhandler.HandleMessagingEvent(d.opening, e) + switch { + case state.LocalActive(): + d.delegator.mhandler.HandleMessagingEvent(d.opened, e) + case state.LocalUninit(): + if d.delegator.AutoOpen { + endpoint.Open() + } + } + + case d.remoteClose: + if endpoint.RemoteCondition().IsSet() { // Closed with error + d.delegator.mhandler.HandleMessagingEvent(d.error, e) + } else { + d.delegator.mhandler.HandleMessagingEvent(d.closing, e) + } + if state.LocalClosed() { + d.delegator.mhandler.HandleMessagingEvent(d.closed, e) + } else if state.LocalActive() { + endpoint.Close() + } + + case d.localClose: + if state.RemoteClosed() { + d.delegator.mhandler.HandleMessagingEvent(d.closed, e) + } + + default: + // We shouldn't be called with any other event type. + panic(fmt.Errorf("internal error, not an open/close event: %s", e)) + } +} + +type flowcontroller struct { + window, drained int +} + +func (d flowcontroller) HandleEvent(e Event) { + link := e.Link() + + switch e.Type() { + case ELinkLocalOpen, ELinkRemoteOpen, ELinkFlow, EDelivery: + if link.IsReceiver() { + d.drained += link.Drained() + if d.drained != 0 { + link.Flow(d.window - link.Credit()) + } + } + } +} + - // MessagingAdapter implments a EventHandler and delegates to a MessagingHandler. ++// MessagingAdapter implements a EventHandler and delegates to a MessagingHandler. +// You can modify the exported fields before you pass the MessagingAdapter to +// a Engine. +type MessagingAdapter struct { + mhandler MessagingHandler + connection, session, link endpointDelegator + flowcontroller EventHandler + + // AutoSettle (default true) automatically pre-settle outgoing messages. + AutoSettle bool + // AutoAccept (default true) automatically accept and settle incoming messages + // if they are not settled by the delegate. + AutoAccept bool + // AutoOpen (default true) automatically open remotely opened endpoints. + AutoOpen bool + // Prefetch (default 10) initial credit to issue for incoming links. + Prefetch int + // PeerCloseIsError (default false) if true a close by the peer will be treated as an error. + PeerCloseError bool +} + +func NewMessagingAdapter(h MessagingHandler) *MessagingAdapter { + return &MessagingAdapter{ + mhandler: h, + flowcontroller: nil, + AutoSettle: true, + AutoAccept: true, + AutoOpen: true, + Prefetch: 10, + PeerCloseError: false, + } +} + +func handleIf(h EventHandler, e Event) { + if h != nil { + h.HandleEvent(e) + } +} + +// Handle a proton event by passing the corresponding MessagingEvent(s) to +// the MessagingHandler. +func (d *MessagingAdapter) HandleEvent(e Event) { + handleIf(d.flowcontroller, e) + + switch e.Type() { + + case EConnectionInit: + d.connection = endpointDelegator{ + EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose, + MConnectionOpening, MConnectionOpened, MConnectionClosing, MConnectionClosed, + MConnectionError, + func(e Event) Endpoint { return e.Connection() }, + d, + } + d.session = endpointDelegator{ + ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose, + MSessionOpening, MSessionOpened, MSessionClosing, MSessionClosed, + MSessionError, + func(e Event) Endpoint { return e.Session() }, + d, + } + d.link = endpointDelegator{ + ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose, + MLinkOpening, MLinkOpened, MLinkClosing, MLinkClosed, + MLinkError, + func(e Event) Endpoint { return e.Link() }, + d, + } + if d.Prefetch > 0 { + d.flowcontroller = flowcontroller{window: d.Prefetch, drained: 0} + } + d.mhandler.HandleMessagingEvent(MStart, e) + + case EConnectionRemoteOpen: + + d.connection.HandleEvent(e) + + case EConnectionRemoteClose: + d.connection.HandleEvent(e) + e.Connection().Transport().CloseTail() + + case EConnectionLocalOpen, EConnectionLocalClose: + d.connection.HandleEvent(e) + + case ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose: + d.session.HandleEvent(e) + + case ELinkRemoteOpen: + e.Link().Source().Copy(e.Link().RemoteSource()) + e.Link().Target().Copy(e.Link().RemoteTarget()) + d.link.HandleEvent(e) + + case ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose: + d.link.HandleEvent(e) + + case ELinkFlow: + if e.Link().IsSender() && e.Link().Credit() > 0 { + d.mhandler.HandleMessagingEvent(MSendable, e) + } + + case EDelivery: + if e.Delivery().Link().IsReceiver() { + d.incoming(e) + } else { + d.outgoing(e) + } + + case ETransportClosed: + d.mhandler.HandleMessagingEvent(MDisconnected, e) + } +} + +func (d *MessagingAdapter) incoming(e Event) { + delivery := e.Delivery() + if delivery.HasMessage() { + d.mhandler.HandleMessagingEvent(MMessage, e) + if d.AutoAccept && !delivery.Settled() { + delivery.Accept() + } + if delivery.Current() { + e.Link().Advance() + } + } else if delivery.Updated() && delivery.Settled() { + d.mhandler.HandleMessagingEvent(MSettled, e) + } + return +} + +func (d *MessagingAdapter) outgoing(e Event) { + delivery := e.Delivery() + if delivery.Updated() { + switch delivery.Remote().Type() { + case Accepted: + d.mhandler.HandleMessagingEvent(MAccepted, e) + case Rejected: + d.mhandler.HandleMessagingEvent(MRejected, e) + case Released, Modified: + d.mhandler.HandleMessagingEvent(MReleased, e) + } + if delivery.Settled() { + // The delivery was settled remotely, inform the local end. + d.mhandler.HandleMessagingEvent(MSettled, e) + } + if d.AutoSettle { + delivery.Settle() // Local settle, don't mhandler MSettled till the remote end settles. + } + } + return +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/message.go ---------------------------------------------------------------------- diff --cc proton/message.go index 2336483,0000000..fbb1d48 mode 100644,000000..100644 --- a/proton/message.go +++ b/proton/message.go @@@ -1,93 -1,0 +1,93 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package proton + +// #include <proton/types.h> +// #include <proton/message.h> +// #include <proton/codec.h> +import "C" + +import ( + "fmt" + "qpid.apache.org/amqp" + "strconv" + "sync/atomic" +) + +// HasMessage is true if all message data is available. +// Equivalent to !d.isNil && d.Readable() && !d.Partial() +func (d Delivery) HasMessage() bool { return !d.IsNil() && d.Readable() && !d.Partial() } + - // Message decodes the message containined in a delivery. ++// Message decodes the message contained in a delivery. +// +// Must be called in the correct link context with this delivery as the current message, +// handling an MMessage event is always a safe context to call this function. +// +// Will return an error if message is incomplete or not current. +func (delivery Delivery) Message() (m amqp.Message, err error) { + if !delivery.Readable() { + return nil, fmt.Errorf("delivery is not readable") + } + if delivery.Partial() { + return nil, fmt.Errorf("delivery has partial message") + } + data := make([]byte, delivery.Pending()) + result := delivery.Link().Recv(data) + if result != len(data) { + return nil, fmt.Errorf("cannot receive message: %s", PnErrorCode(result)) + } + m = amqp.NewMessage() + err = m.Decode(data) + return +} + +// Process-wide atomic counter for generating tag names +var tagCounter uint64 + +func nextTag() string { + return strconv.FormatUint(atomic.AddUint64(&tagCounter, 1), 32) +} + +// Send sends a amqp.Message over a Link. +// Returns a Delivery that can be use to determine the outcome of the message. +func (link Link) Send(m amqp.Message) (Delivery, error) { + if !link.IsSender() { + return Delivery{}, fmt.Errorf("attempt to send message on receiving link") + } + + delivery := link.Delivery(nextTag()) + bytes, err := m.Encode(nil) + if err != nil { - return Delivery{}, fmt.Errorf("cannot send mesage %s", err) ++ return Delivery{}, fmt.Errorf("cannot send message %s", err) + } + result := link.SendBytes(bytes) + link.Advance() + if result != len(bytes) { + if result < 0 { + return delivery, fmt.Errorf("send failed %v", PnErrorCode(result)) + } else { + return delivery, fmt.Errorf("send incomplete %v of %v", result, len(bytes)) + } + } + if link.RemoteSndSettleMode() == SndSettled { + delivery.Settle() + } + return delivery, nil +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/wrappers.go ---------------------------------------------------------------------- diff --cc proton/wrappers.go index 09f3e65,0000000..a7b7fb2 mode 100644,000000..100644 --- a/proton/wrappers.go +++ b/proton/wrappers.go @@@ -1,460 -1,0 +1,460 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// This file contains special-case wrapper functions or wrappers that don't follow +// the pattern of genwrap.go. + +package proton + +//#include <proton/codec.h> +//#include <proton/connection.h> +//#include <proton/delivery.h> +//#include <proton/event.h> +//#include <proton/link.h> +//#include <proton/link.h> +//#include <proton/object.h> +//#include <proton/sasl.h> +//#include <proton/session.h> +//#include <proton/transport.h> +//#include <stdlib.h> +import "C" + +import ( + "fmt" + "qpid.apache.org/amqp" + "reflect" + "time" + "unsafe" +) + +// TODO aconway 2015-05-05: Documentation for generated types. + +// CHandle holds an unsafe.Pointer to a proton C struct, the C type depends on the +// Go type implementing this interface. For low level, at-your-own-risk use only. +type CHandle interface { + // CPtr returns the unsafe C pointer, equivalent to a C void*. + CPtr() unsafe.Pointer +} + +// Incref increases the refcount of a proton value, which prevents the +// underlying C struct being freed until you call Decref(). +// +// It can be useful to "pin" a proton value in memory while it is in use by +// goroutines other than the event loop goroutine. For example if you Incref() a +// Link, the underlying object is not freed when the link is closed, so means +// other goroutines can continue to safely use it as an index in a map or inject +// it into the event loop goroutine. There will of course be an error if you try +// to use a link after it is closed, but not a segmentation fault. +func Incref(c CHandle) { + if p := c.CPtr(); p != nil { + C.pn_incref(p) + } +} + +// Decref decreases the refcount of a proton value, freeing the underlying C +// struct if this is the last reference. Only call this if you previously +// called Incref() for this value. +func Decref(c CHandle) { + if p := c.CPtr(); p != nil { + C.pn_decref(p) + } +} + +// Event is an AMQP protocol event. +type Event struct { + pn *C.pn_event_t + eventType EventType + connection Connection + transport Transport + session Session + link Link + delivery Delivery + injecter Injecter +} + +func makeEvent(pn *C.pn_event_t, injecter Injecter) Event { + return Event{ + pn: pn, + eventType: EventType(C.pn_event_type(pn)), + connection: Connection{C.pn_event_connection(pn)}, + transport: Transport{C.pn_event_transport(pn)}, + session: Session{C.pn_event_session(pn)}, + link: Link{C.pn_event_link(pn)}, + delivery: Delivery{C.pn_event_delivery(pn)}, + injecter: injecter, + } +} +func (e Event) IsNil() bool { return e.eventType == EventType(0) } +func (e Event) Type() EventType { return e.eventType } +func (e Event) Connection() Connection { return e.connection } +func (e Event) Transport() Transport { return e.transport } +func (e Event) Session() Session { return e.session } +func (e Event) Link() Link { return e.link } +func (e Event) Delivery() Delivery { return e.delivery } +func (e Event) String() string { return e.Type().String() } + +// Injecter should not be used in a handler function, but it can be passed to +// other goroutines (via a channel or to a goroutine started by handler +// functions) to let them inject functions back into the handlers goroutine. +func (e Event) Injecter() Injecter { return e.injecter } + +// Data is an intermediate form of decoded AMQP data. +type Data struct{ pn *C.pn_data_t } + +func (d Data) Free() { C.pn_data_free(d.pn) } +func (d Data) CPtr() unsafe.Pointer { return unsafe.Pointer(d.pn) } +func (d Data) Clear() { C.pn_data_clear(d.pn) } +func (d Data) Rewind() { C.pn_data_rewind(d.pn) } +func (d Data) Next() { C.pn_data_next(d.pn) } +func (d Data) Error() error { return PnError(C.pn_data_error(d.pn)) } +func (d Data) Empty() bool { return C.pn_data_size(d.pn) == 0 } + +func (d Data) String() string { + str := C.pn_string(C.CString("")) + defer C.pn_free(unsafe.Pointer(str)) + C.pn_inspect(unsafe.Pointer(d.pn), str) + return C.GoString(C.pn_string_get(str)) +} + +// Unmarshal the value of d into value pointed at by ptr, see amqp.Unmarshal() for details +func (d Data) Unmarshal(ptr interface{}) error { + d.Rewind() + d.Next() + err := amqp.UnmarshalUnsafe(d.CPtr(), ptr) + return err +} + +// Marshal the value v into d, see amqp.Marshal() for details +func (d Data) Marshal(v interface{}) error { + d.Clear() + return amqp.MarshalUnsafe(v, d.CPtr()) +} + +// State holds the state flags for an AMQP endpoint. +type State byte + +const ( + SLocalUninit State = C.PN_LOCAL_UNINIT + SLocalActive = C.PN_LOCAL_ACTIVE + SLocalClosed = C.PN_LOCAL_CLOSED + SRemoteUninit = C.PN_REMOTE_UNINIT + SRemoteActive = C.PN_REMOTE_ACTIVE + SRemoteClosed = C.PN_REMOTE_CLOSED +) + +// Has is True if bits & state is non 0. +func (s State) Has(bits State) bool { return s&bits != 0 } + +func (s State) LocalUninit() bool { return s.Has(SLocalUninit) } +func (s State) LocalActive() bool { return s.Has(SLocalActive) } +func (s State) LocalClosed() bool { return s.Has(SLocalClosed) } +func (s State) RemoteUninit() bool { return s.Has(SRemoteUninit) } +func (s State) RemoteActive() bool { return s.Has(SRemoteActive) } +func (s State) RemoteClosed() bool { return s.Has(SRemoteClosed) } + - // Return a State containig just the local flags ++// Return a State containing just the local flags +func (s State) Local() State { return State(s & C.PN_LOCAL_MASK) } + - // Return a State containig just the remote flags ++// Return a State containing just the remote flags +func (s State) Remote() State { return State(s & C.PN_REMOTE_MASK) } + +// Endpoint is the common interface for Connection, Link and Session. +type Endpoint interface { + // State is the open/closed state. + State() State + // Open an endpoint. + Open() + // Close an endpoint. + Close() + // Condition holds a local error condition. + Condition() Condition + // RemoteCondition holds a remote error condition. + RemoteCondition() Condition + // Human readable name + String() string + // Human readable endpoint type "sender-link", "session" etc. + Type() string +} + +// CloseError sets an error condition (if err != nil) on an endpoint and closes +// the endpoint if not already closed +func CloseError(e Endpoint, err error) { + if err != nil && !e.Condition().IsSet() { + e.Condition().SetError(err) + } + e.Close() +} + +// EndpointError returns the remote error if there is one, the local error if not +// nil if there is no error. +func EndpointError(e Endpoint) error { + err := e.RemoteCondition().Error() + if err == nil { + err = e.Condition().Error() + } + return err +} + +const ( + Received uint64 = C.PN_RECEIVED + Accepted = C.PN_ACCEPTED + Rejected = C.PN_REJECTED + Released = C.PN_RELEASED + Modified = C.PN_MODIFIED +) + +// SettleAs is equivalent to d.Update(disposition); d.Settle() +func (d Delivery) SettleAs(disposition uint64) { + d.Update(disposition) + d.Settle() +} + +// Accept accepts and settles a delivery. +func (d Delivery) Accept() { d.SettleAs(Accepted) } + +// Reject rejects and settles a delivery +func (d Delivery) Reject() { d.SettleAs(Rejected) } + +// Release releases and settles a delivery +// If delivered is true the delivery count for the message will be increased. +func (d Delivery) Release(delivered bool) { + if delivered { + d.SettleAs(Modified) + } else { + d.SettleAs(Released) + } +} + +type DeliveryTag struct{ pn C.pn_delivery_tag_t } + +func (t DeliveryTag) String() string { return C.GoStringN(t.pn.start, C.int(t.pn.size)) } + +func (l Link) Recv(buf []byte) int { + if len(buf) == 0 { + return 0 + } + return int(C.pn_link_recv(l.pn, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) +} + +func (l Link) SendBytes(bytes []byte) int { + return int(C.pn_link_send(l.pn, cPtr(bytes), cLen(bytes))) +} + +func pnTag(tag string) C.pn_delivery_tag_t { + bytes := []byte(tag) + return C.pn_dtag(cPtr(bytes), cLen(bytes)) +} + +func (l Link) Delivery(tag string) Delivery { + return Delivery{C.pn_delivery(l.pn, pnTag(tag))} +} + +func (l Link) Connection() Connection { return l.Session().Connection() } + +// Human-readable link description including name, source, target and direction. +func (l Link) String() string { + switch { + case l.IsNil(): + return fmt.Sprintf("<nil-link>") + case l.IsSender(): + return fmt.Sprintf("%s(%s->%s)", l.Name(), l.Source().Address(), l.Target().Address()) + default: + return fmt.Sprintf("%s(%s<-%s)", l.Name(), l.Target().Address(), l.Source().Address()) + } +} + +func (l Link) Type() string { + if l.IsSender() { + return "sender-link" + } else { + return "receiver-link" + } +} + +// IsDrain calls pn_link_get_drain(), it conflicts with pn_link_drain() under the normal mapping. +func (l Link) IsDrain() bool { + return bool(C.pn_link_get_drain(l.pn)) +} + +func cPtr(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} + +func cLen(b []byte) C.size_t { + return C.size_t(len(b)) +} + +func (s Session) Sender(name string) Link { + cname := C.CString(name) + defer C.free(unsafe.Pointer(cname)) + return Link{C.pn_sender(s.pn, cname)} +} + +func (s Session) Receiver(name string) Link { + cname := C.CString(name) + defer C.free(unsafe.Pointer(cname)) + return Link{C.pn_receiver(s.pn, cname)} +} + +func (t Transport) String() string { + return fmt.Sprintf("(Transport)(%p)", t.CPtr()) +} + +// Unique (per process) string identifier for a connection, useful for debugging. +func (c Connection) String() string { + // Use the transport address to match the default transport logs from PN_TRACE. + return fmt.Sprintf("(Connection)(%p)", c.Transport().CPtr()) +} + +func (c Connection) Type() string { + return "connection" +} + +// Head functions don't follow the normal naming conventions so missed by the generator. + +func (c Connection) LinkHead(s State) Link { + return Link{C.pn_link_head(c.pn, C.pn_state_t(s))} +} + +func (c Connection) SessionHead(s State) Session { + return Session{C.pn_session_head(c.pn, C.pn_state_t(s))} +} + +func (c Connection) Links(state State) (links []Link) { + for l := c.LinkHead(state); !l.IsNil(); l = l.Next(state) { + links = append(links, l) + } + return +} + +func (c Connection) Sessions(state State) (sessions []Session) { + for s := c.SessionHead(state); !s.IsNil(); s = s.Next(state) { + sessions = append(sessions, s) + } + return +} + +// SetPassword takes []byte not string because it is impossible to erase a string +// from memory reliably. Proton will not keep the password in memory longer than +// needed, the caller should overwrite their copy on return. +// +// The password must not contain embedded nul characters, a trailing nul is ignored. +func (c Connection) SetPassword(password []byte) { + if len(password) == 0 || password[len(password)-1] != 0 { + password = append(password, 0) // Proton requires a terminating null. + } + C.pn_connection_set_password(c.pn, (*C.char)(unsafe.Pointer(&password[0]))) +} + +func (s Session) String() string { + return fmt.Sprintf("(Session)(%p)", s.pn) // TODO aconway 2016-09-12: should print channel number. +} + +func (s Session) Type() string { return "session" } + +// Error returns an instance of amqp.Error or nil. +func (c Condition) Error() error { + if c.IsNil() || !c.IsSet() { + return nil + } + return amqp.Error{Name: c.Name(), Description: c.Description()} +} + +// Set a Go error into a condition. +// If it is not an amqp.Condition use the error type as name, error string as description. +func (c Condition) SetError(err error) { + if err != nil { + if cond, ok := err.(amqp.Error); ok { + c.SetName(cond.Name) + c.SetDescription(cond.Description) + } else { + c.SetName(reflect.TypeOf(err).Name()) + c.SetDescription(err.Error()) + } + } +} + +func (c Connection) Session() (Session, error) { + s := Session{C.pn_session(c.pn)} + if s.IsNil() { + return s, Connection(c).Error() + } + return s, nil +} + +// pnTime converts Go time.Time to Proton millisecond Unix time. +// +// Note: t.isZero() is converted to C.pn_timestamp_t(0) and vice-versa. These +// are used as "not set" sentinel values by the Go and Proton APIs, so it is +// better to conserve the "zeroness" even though they don't represent the same +// time instant. +// +func pnTime(t time.Time) (pnt C.pn_timestamp_t) { + if !t.IsZero() { + pnt = C.pn_timestamp_t(t.Unix()*1000 + int64(t.Nanosecond())/int64(time.Millisecond)) + } + return +} + +// goTime converts a pn_timestamp_t to a Go time.Time. +// +// Note: C.pn_timestamp_t(0) is converted to a zero time.Time and +// vice-versa. These are used as "not set" sentinel values by the Go and Proton +// APIs, so it is better to conserve the "zeroness" even though they don't +// represent the same time instant. +// +func goTime(pnt C.pn_timestamp_t) (t time.Time) { + if pnt != 0 { + t = time.Unix(int64(pnt/1000), int64(pnt%1000)*int64(time.Millisecond)) + } + return +} + +// Special treatment for Transport.Head, return value is unsafe.Pointer not string +func (t Transport) Head() unsafe.Pointer { + return unsafe.Pointer(C.pn_transport_head(t.pn)) +} + +// Special treatment for Transport.Tail, return value is unsafe.Pointer not string +func (t Transport) Tail() unsafe.Pointer { + return unsafe.Pointer(C.pn_transport_tail(t.pn)) +} + +// Special treatment for Transport.Push, takes []byte instead of char*, size +func (t Transport) Push(bytes []byte) int { + return int(C.pn_transport_push(t.pn, (*C.char)(unsafe.Pointer(&bytes[0])), C.size_t(len(bytes)))) +} + +// Get the SASL object for the transport. +func (t Transport) SASL() SASL { + return SASL{C.pn_sasl(t.pn)} +} + +// Do we support extended SASL negotiation? +// All implementations of Proton support ANONYMOUS and EXTERNAL on both +// client and server sides and PLAIN on the client side. +// +// Extended SASL implememtations use an external library (Cyrus SASL) +// to support other mechanisms beyond these basic ones. +func SASLExtended() bool { + return bool(C.pn_sasl_extended()) +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
