http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/engine.go
----------------------------------------------------------------------
diff --cc proton/engine.go
index c0f0093,0000000..d28a09f
mode 100644,000000..100644
--- a/proton/engine.go
+++ b/proton/engine.go
@@@ -1,422 -1,0 +1,422 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package proton
 +
 +import (
 +      "fmt"
 +      "net"
 +      "os"
 +      "strings"
 +      "sync"
 +      "time"
 +      "unsafe"
 +)
 +
 +/*
 +#include <proton/connection.h>
 +#include <proton/event.h>
 +#include <proton/error.h>
 +#include <proton/handlers.h>
 +#include <proton/session.h>
 +#include <proton/transport.h>
 +#include <memory.h>
 +#include <stdlib.h>
 +*/
 +import "C"
 +
 +// Injecter allows functions to be "injected" into the event-processing loop, 
to
 +// be called in the same goroutine as event handlers.
 +type Injecter interface {
 +      // Inject a function into the engine goroutine.
 +      //
 +      // f() will be called in the same goroutine as event handlers, so it 
can safely
 +      // use values belonging to event handlers without synchronization. f() 
should
 +      // not block, no further events or injected functions can be processed 
until
 +      // f() returns.
 +      //
 +      // Returns a non-nil error if the function could not be injected and 
will
 +      // never be called. Otherwise the function will eventually be called.
 +      //
 +      // Note that proton values (Link, Session, Connection etc.) that 
existed when
 +      // Inject(f) was called may have become invalid by the time f() is 
executed.
 +      // Handlers should handle keep track of Closed events to ensure proton 
values
 +      // are not used after they become invalid. One technique is to have map 
from
 +      // proton values to application values. Check that the map has the 
correct
 +      // proton/application value pair at the start of the injected function 
and
 +      // delete the value from the map when handling a Closed event.
 +      Inject(f func()) error
 +
 +      // InjectWait is like Inject but does not return till f() has completed.
 +      // If f() cannot be injected it returns the error from Inject(), 
otherwise
 +      // it returns the error from f()
 +      InjectWait(f func() error) error
 +}
 +
 +// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
 +// Handler functions sequentially in a single goroutine. Actions taken by
 +// Handler functions (such as sending messages) are encoded and written to the
 +// net.Conn. You can create multiple Engines to handle multiple connections
 +// concurrently.
 +//
 +// You implement the EventHandler and/or MessagingHandler interfaces and 
provide
 +// those values to NewEngine(). Their HandleEvent method will be called in the
 +// event-handling goroutine.
 +//
 +// Handlers can pass values from an event (Connections, Links, Deliveries 
etc.) to
 +// other goroutines, store them, or use them as map indexes. Effectively they 
are
 +// just pointers.  Other goroutines cannot call their methods directly but 
they can
 +// can create a function closure to call such methods and pass it to 
Engine.Inject()
 +// to have it evaluated in the engine goroutine.
 +//
 +// You are responsible for ensuring you don't use an event value after it is
 +// invalid. The handler methods will tell you when a value is no longer 
valid. For
 +// example after a LinkClosed event, that link is no longer valid. If you do
 +// Link.Close() yourself (in a handler or injected function) the link remains 
valid
- // until the corresponing LinkClosed event is received by the handler.
++// until the corresponding LinkClosed event is received by the handler.
 +//
 +// Engine.Close() will take care of cleaning up any remaining values when you 
are
 +// done with the Engine. All values associated with a engine become invalid 
when you
 +// call Engine.Close()
 +//
 +// The qpid.apache.org/proton/concurrent package will do all this for you, so 
it
 +// may be a better choice for some applications.
 +//
 +type Engine struct {
 +      // Error is set on exit from Run() if there was an error.
 +      err    ErrorHolder
 +      inject chan func()
 +
 +      conn       net.Conn
 +      connection Connection
 +      transport  Transport
 +      collector  *C.pn_collector_t
 +      handlers   []EventHandler // Handlers for proton events.
 +      running    chan struct{}  // This channel will be closed when the 
goroutines are done.
 +      closeOnce  sync.Once
 +      timer      *time.Timer
 +      traceEvent bool
 +}
 +
 +const bufferSize = 4096
 +
 +func envBool(name string) bool {
 +      v := strings.ToLower(os.Getenv(name))
 +      return v == "true" || v == "1" || v == "yes" || v == "on"
 +}
 +
 +// Create a new Engine and call Initialize() with conn and handlers
 +func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
 +      eng := &Engine{}
 +      return eng, eng.Initialize(conn, handlers...)
 +}
 +
 +// Initialize an Engine with a connection and handlers. Start it with Run()
 +func (eng *Engine) Initialize(conn net.Conn, handlers ...EventHandler) error {
 +      eng.inject = make(chan func())
 +      eng.conn = conn
 +      eng.connection = Connection{C.pn_connection()}
 +      eng.transport = Transport{C.pn_transport()}
 +      eng.collector = C.pn_collector()
 +      eng.handlers = handlers
 +      eng.running = make(chan struct{})
 +      eng.timer = time.NewTimer(0)
 +      eng.traceEvent = envBool("PN_TRACE_EVT")
 +      if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == 
nil {
 +              eng.free()
 +              return fmt.Errorf("proton.NewEngine cannot allocate")
 +      }
 +      C.pn_connection_collect(eng.connection.pn, eng.collector)
 +      return nil
 +}
 +
 +// Create a byte slice backed by C memory.
 +// Empty or error (size <= 0) returns a nil byte slice.
 +func cByteSlice(start unsafe.Pointer, size int) []byte {
 +      if start == nil || size <= 0 {
 +              return nil
 +      } else {
 +              // Slice from very large imaginary array in C memory
 +              return (*[1 << 30]byte)(start)[:size:size]
 +      }
 +}
 +
 +func (eng *Engine) Connection() Connection {
 +      return eng.connection
 +}
 +
 +func (eng *Engine) Transport() Transport {
 +      return eng.transport
 +}
 +
 +func (eng *Engine) String() string {
 +      return fmt.Sprintf("[%s]%s-%s", eng.Id(), eng.conn.LocalAddr(), 
eng.conn.RemoteAddr())
 +}
 +
 +func (eng *Engine) Id() string {
 +      // Use transport address to match default PN_TRACE_FRM=1 output.
 +      return fmt.Sprintf("%p", eng.Transport().CPtr())
 +}
 +
 +func (eng *Engine) Error() error {
 +      return eng.err.Get()
 +}
 +
 +// Inject a function into the Engine's event loop.
 +//
 +// f() will be called in the same event-processing goroutine that calls 
Handler
 +// methods. f() can safely call methods on values that belong to this engine
 +// (Sessions, Links etc)
 +//
 +// The injected function has no parameters or return values. It is normally a
 +// closure and can use channels to communicate with the injecting goroutine if
 +// necessary.
 +//
 +// Returns a non-nil error if the engine is closed before the function could 
be
 +// injected.
 +func (eng *Engine) Inject(f func()) error {
 +      select {
 +      case eng.inject <- f:
 +              return nil
 +      case <-eng.running:
 +              return eng.Error()
 +      }
 +}
 +
 +// InjectWait is like Inject but does not return till f() has completed or the
 +// engine is closed, and returns an error value from f()
 +func (eng *Engine) InjectWait(f func() error) error {
 +      done := make(chan error)
 +      defer close(done)
 +      err := eng.Inject(func() { done <- f() })
 +      if err != nil {
 +              return err
 +      }
 +      select {
 +      case <-eng.running:
 +              return eng.Error()
 +      case err := <-done:
 +              return err
 +      }
 +}
 +
 +// Server puts the Engine in server mode, meaning it will auto-detect 
security settings on
- // the incoming connnection such as use of SASL and SSL.
++// the incoming connection such as use of SASL and SSL.
 +// Must be called before Run()
 +//
 +func (eng *Engine) Server() { eng.Transport().SetServer() }
 +
 +func (eng *Engine) disconnect(err error) {
 +      cond := eng.Transport().Condition()
 +      cond.SetError(err)              // Set the provided error.
 +      cond.SetError(eng.conn.Close()) // Use connection error if cond is not 
already set.
 +      eng.transport.CloseTail()
 +      eng.transport.CloseHead()
 +}
 +
 +// Close the engine's connection.
 +// If err != nil pass it to the remote end as the close condition.
 +// Returns when the remote end closes or disconnects.
 +func (eng *Engine) Close(err error) {
 +      _ = eng.Inject(func() { CloseError(eng.Connection(), err) })
 +      <-eng.running
 +}
 +
 +// CloseTimeout like Close but disconnect if the remote end doesn't close 
within timeout.
 +func (eng *Engine) CloseTimeout(err error, timeout time.Duration) {
 +      _ = eng.Inject(func() { CloseError(eng.Connection(), err) })
 +      select {
 +      case <-eng.running:
 +      case <-time.After(timeout):
 +              eng.Disconnect(err)
 +      }
 +}
 +
 +// Disconnect the engine's connection immediately without an AMQP close.
 +// Process any termination events before returning.
 +func (eng *Engine) Disconnect(err error) {
 +      _ = eng.Inject(func() { eng.disconnect(err) })
 +      <-eng.running
 +}
 +
 +// Let proton run timed activity and set up the next tick
 +func (eng *Engine) tick() {
 +      now := time.Now()
 +      next := eng.Transport().Tick(now)
 +      if !next.IsZero() {
 +              eng.timer.Reset(next.Sub(now))
 +      }
 +}
 +
 +func (eng *Engine) dispatch() bool {
 +      for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = 
C.pn_collector_peek(eng.collector) {
 +              e := makeEvent(ce, eng)
 +              if eng.traceEvent {
 +                      eng.transport.Log(e.String())
 +              }
 +              for _, h := range eng.handlers {
 +                      h.HandleEvent(e)
 +              }
 +              if e.Type() == EConnectionRemoteOpen {
 +                      eng.tick() // Update the tick if changed by remote.
 +              }
 +              C.pn_collector_pop(eng.collector)
 +      }
 +      return !eng.transport.Closed() || C.pn_collector_peek(eng.collector) != 
nil
 +}
 +
 +func (eng *Engine) writeBuffer() []byte {
 +      size := eng.Transport().Pending() // Evaluate before Head(), may change 
buffer.
 +      start := eng.Transport().Head()
 +      return cByteSlice(start, size)
 +}
 +
 +func (eng *Engine) readBuffer() []byte {
 +      size := eng.Transport().Capacity()
 +      start := eng.Transport().Tail()
 +      return cByteSlice(start, size)
 +}
 +
 +func (eng *Engine) free() {
 +      if !eng.transport.IsNil() {
 +              eng.transport.Unbind()
 +              eng.transport.Free()
 +              eng.transport = Transport{}
 +      }
 +      if !eng.connection.IsNil() {
 +              eng.connection.Free()
 +              eng.connection = Connection{}
 +      }
 +      if eng.collector != nil {
 +              C.pn_collector_release(eng.collector)
 +              C.pn_collector_free(eng.collector)
 +              eng.collector = nil
 +      }
 +}
 +
 +// Run the engine. Engine.Run() will exit when the engine is closed or
 +// disconnected.  You can check for errors after exit with Engine.Error().
 +//
 +func (eng *Engine) Run() error {
 +      defer eng.free()
 +      eng.transport.Bind(eng.connection)
 +      eng.tick() // Start ticking if needed
 +
 +      // Channels for read and write buffers going in and out of the 
read/write goroutines.
-       // The channels are unbuffered: we want to exchange buffers in 
seuquence.
++      // The channels are unbuffered: we want to exchange buffers in sequence.
 +      readsIn, writesIn := make(chan []byte), make(chan []byte)
 +      readsOut, writesOut := make(chan []byte), make(chan []byte)
 +
 +      wait := sync.WaitGroup{}
 +      wait.Add(2) // Read and write goroutines
 +
 +      go func() { // Read goroutine
 +              defer wait.Done()
 +              for {
 +                      rbuf, ok := <-readsIn
 +                      if !ok {
 +                              return
 +                      }
 +                      n, err := eng.conn.Read(rbuf)
 +                      if n > 0 {
 +                              readsOut <- rbuf[:n]
 +                      } else if err != nil {
 +                              _ = eng.Inject(func() {
 +                                      
eng.Transport().Condition().SetError(err)
 +                                      eng.Transport().CloseTail()
 +                              })
 +                              return
 +                      }
 +              }
 +      }()
 +
 +      go func() { // Write goroutine
 +              defer wait.Done()
 +              for {
 +                      wbuf, ok := <-writesIn
 +                      if !ok {
 +                              return
 +                      }
 +                      n, err := eng.conn.Write(wbuf)
 +                      if n > 0 {
 +                              writesOut <- wbuf[:n]
 +                      } else if err != nil {
 +                              _ = eng.Inject(func() {
 +                                      
eng.Transport().Condition().SetError(err)
 +                                      eng.Transport().CloseHead()
 +                              })
 +                              return
 +                      }
 +              }
 +      }()
 +
 +      for eng.dispatch() {
 +              readBuf := eng.readBuffer()
 +              writeBuf := eng.writeBuffer()
 +              // Note that getting the buffers can generate events (eg. SASL 
events) that
 +              // might close the transport. Check if we are already finished 
before
 +              // blocking for IO.
 +              if !eng.dispatch() {
 +                      break
 +              }
 +
 +              // sendReads/sendWrites are nil (not sendable in select) unless 
we have a
 +              // buffer to read/write
 +              var sendReads, sendWrites chan []byte
 +              if readBuf != nil {
 +                      sendReads = readsIn
 +              }
 +              if writeBuf != nil {
 +                      sendWrites = writesIn
 +              }
 +
 +              // Send buffers to the read/write goroutines if we have them.
 +              // Get buffers from the read/write goroutines and process them
 +              // Check for injected functions
 +              select {
 +
 +              case sendReads <- readBuf:
 +
 +              case sendWrites <- writeBuf:
 +
 +              case buf := <-readsOut:
 +                      eng.transport.Process(uint(len(buf)))
 +
 +              case buf := <-writesOut:
 +                      eng.transport.Pop(uint(len(buf)))
 +
 +              case f, ok := <-eng.inject: // Function injected from another 
goroutine
 +                      if ok {
 +                              f()
 +                      }
 +
 +              case <-eng.timer.C:
 +                      eng.tick()
 +              }
 +      }
 +
 +      eng.err.Set(EndpointError(eng.Connection()))
 +      eng.err.Set(eng.Transport().Condition().Error())
 +      close(readsIn)
 +      close(writesIn)
 +      close(eng.running)   // Signal goroutines have exited and Error is set, 
disable Inject()
 +      _ = eng.conn.Close() // Close conn, force read/write goroutines to exit 
(they will Inject)
 +      wait.Wait()          // Wait for goroutines
 +      return eng.err.Get()
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/handlers.go
----------------------------------------------------------------------
diff --cc proton/handlers.go
index 961136e,0000000..f101548
mode 100644,000000..100644
--- a/proton/handlers.go
+++ b/proton/handlers.go
@@@ -1,395 -1,0 +1,395 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package proton
 +
 +import "fmt"
 +
 +// EventHandler handles core proton events.
 +type EventHandler interface {
 +      // HandleEvent is called with an event.
 +      // Typically HandleEvent() is implemented as a switch on e.Type()
 +      // Returning an error will stop the Engine.
 +      HandleEvent(e Event)
 +}
 +
 +// MessagingHandler provides an alternative interface to EventHandler.
 +// it is easier to use for most applications that send and receive messages.
 +//
 +// Implement this interface and then wrap your value with a 
MessagingHandlerDelegator.
 +// MessagingHandlerDelegator implements EventHandler and can be registered 
with a Engine.
 +//
 +type MessagingHandler interface {
 +      // HandleMessagingEvent is called with  MessagingEvent.
 +      // Typically HandleEvent() is implemented as a switch on e.Type()
 +      // Returning an error will stop the Engine.
 +      HandleMessagingEvent(MessagingEvent, Event)
 +}
 +
 +// MessagingEvent provides a set of events that are easier to work with than 
the
 +// core events defined by EventType
 +//
 +// There are 3 types of "endpoint": Connection, Session and Link.  For each
 +// endpoint there are 5 events: Opening, Opened, Closing, Closed and Error.
 +//
 +// The meaning of these events is as follows:
 +//
 +// Opening: The remote end opened, the local end will open automatically.
 +//
 +// Opened: Both ends are open, regardless of which end opened first.
 +//
 +// Closing: The remote end closed without error, the local end will close 
automatically.
 +//
 +// Error: The remote end closed with an error, the local end will close 
automatically.
 +//
 +// Closed: Both ends are closed, regardless of which end closed first or if 
there was an error.
 +// No further events will be received for the endpoint.
 +//
 +type MessagingEvent int
 +
 +const (
 +      // The event loop starts.
 +      MStart MessagingEvent = iota
 +      // The peer closes the connection with an error condition.
 +      MConnectionError
 +      // The peer closes the session with an error condition.
 +      MSessionError
 +      // The peer closes the link with an error condition.
 +      MLinkError
 +      // The peer Initiates the opening of the connection.
 +      MConnectionOpening
 +      // The peer initiates the opening of the session.
 +      MSessionOpening
 +      // The peer initiates the opening of the link.
 +      MLinkOpening
 +      // The connection is opened.
 +      MConnectionOpened
 +      // The session is opened.
 +      MSessionOpened
 +      // The link is opened.
 +      MLinkOpened
 +      // The peer initiates the closing of the connection.
 +      MConnectionClosing
 +      // The peer initiates the closing of the session.
 +      MSessionClosing
 +      // The peer initiates the closing of the link.
 +      MLinkClosing
 +      // Both ends of the connection are closed.
 +      MConnectionClosed
 +      // Both ends of the session are closed.
 +      MSessionClosed
 +      // Both ends of the link are closed.
 +      MLinkClosed
 +      // The sender link has credit and messages can
 +      // therefore be transferred.
 +      MSendable
 +      // The remote peer accepts an outgoing message.
 +      MAccepted
 +      // The remote peer rejects an outgoing message.
 +      MRejected
 +      // The peer releases an outgoing message. Note that this may be in 
response to
 +      // either the RELEASE or MODIFIED state as defined by the AMQP 
specification.
 +      MReleased
 +      // The peer has settled the outgoing message. This is the point at 
which it
 +      // should never be re-transmitted.
 +      MSettled
 +      // A message is received. Call Event.Delivery().Message() to decode as 
an amqp.Message.
 +      // To manage the outcome of this messages (e.g. to accept or reject the 
message)
 +      // use Event.Delivery().
 +      MMessage
 +      // A network connection was disconnected.
 +      MDisconnected
 +)
 +
 +func (t MessagingEvent) String() string {
 +      switch t {
 +      case MStart:
 +              return "Start"
 +      case MConnectionError:
 +              return "ConnectionError"
 +      case MSessionError:
 +              return "SessionError"
 +      case MLinkError:
 +              return "LinkError"
 +      case MConnectionOpening:
 +              return "ConnectionOpening"
 +      case MSessionOpening:
 +              return "SessionOpening"
 +      case MLinkOpening:
 +              return "LinkOpening"
 +      case MConnectionOpened:
 +              return "ConnectionOpened"
 +      case MSessionOpened:
 +              return "SessionOpened"
 +      case MLinkOpened:
 +              return "LinkOpened"
 +      case MConnectionClosing:
 +              return "ConnectionClosing"
 +      case MSessionClosing:
 +              return "SessionClosing"
 +      case MLinkClosing:
 +              return "LinkClosing"
 +      case MConnectionClosed:
 +              return "ConnectionClosed"
 +      case MSessionClosed:
 +              return "SessionClosed"
 +      case MLinkClosed:
 +              return "LinkClosed"
 +      case MDisconnected:
 +              return "Disconnected"
 +      case MSendable:
 +              return "Sendable"
 +      case MAccepted:
 +              return "Accepted"
 +      case MRejected:
 +              return "Rejected"
 +      case MReleased:
 +              return "Released"
 +      case MSettled:
 +              return "Settled"
 +      case MMessage:
 +              return "Message"
 +      default:
 +              return "Unknown"
 +      }
 +}
 +
 +// ResourceHandler provides a simple way to track the creation and deletion of
 +// various proton objects.
 +// endpointDelegator captures common patterns for endpoints opening/closing
 +type endpointDelegator struct {
 +      remoteOpen, remoteClose, localOpen, localClose EventType
 +      opening, opened, closing, closed, error        MessagingEvent
 +      endpoint                                       func(Event) Endpoint
 +      delegator                                      *MessagingAdapter
 +}
 +
 +// HandleEvent handles an open/close event for an endpoint in a generic way.
 +func (d endpointDelegator) HandleEvent(e Event) {
 +      endpoint := d.endpoint(e)
 +      state := endpoint.State()
 +
 +      switch e.Type() {
 +
 +      case d.localOpen:
 +              if state.RemoteActive() {
 +                      d.delegator.mhandler.HandleMessagingEvent(d.opened, e)
 +              }
 +
 +      case d.remoteOpen:
 +              d.delegator.mhandler.HandleMessagingEvent(d.opening, e)
 +              switch {
 +              case state.LocalActive():
 +                      d.delegator.mhandler.HandleMessagingEvent(d.opened, e)
 +              case state.LocalUninit():
 +                      if d.delegator.AutoOpen {
 +                              endpoint.Open()
 +                      }
 +              }
 +
 +      case d.remoteClose:
 +              if endpoint.RemoteCondition().IsSet() { // Closed with error
 +                      d.delegator.mhandler.HandleMessagingEvent(d.error, e)
 +              } else {
 +                      d.delegator.mhandler.HandleMessagingEvent(d.closing, e)
 +              }
 +              if state.LocalClosed() {
 +                      d.delegator.mhandler.HandleMessagingEvent(d.closed, e)
 +              } else if state.LocalActive() {
 +                      endpoint.Close()
 +              }
 +
 +      case d.localClose:
 +              if state.RemoteClosed() {
 +                      d.delegator.mhandler.HandleMessagingEvent(d.closed, e)
 +              }
 +
 +      default:
 +              // We shouldn't be called with any other event type.
 +              panic(fmt.Errorf("internal error, not an open/close event: %s", 
e))
 +      }
 +}
 +
 +type flowcontroller struct {
 +      window, drained int
 +}
 +
 +func (d flowcontroller) HandleEvent(e Event) {
 +      link := e.Link()
 +
 +      switch e.Type() {
 +      case ELinkLocalOpen, ELinkRemoteOpen, ELinkFlow, EDelivery:
 +              if link.IsReceiver() {
 +                      d.drained += link.Drained()
 +                      if d.drained != 0 {
 +                              link.Flow(d.window - link.Credit())
 +                      }
 +              }
 +      }
 +}
 +
- // MessagingAdapter implments a EventHandler and delegates to a 
MessagingHandler.
++// MessagingAdapter implements a EventHandler and delegates to a 
MessagingHandler.
 +// You can modify the exported fields before you pass the MessagingAdapter to
 +// a Engine.
 +type MessagingAdapter struct {
 +      mhandler                  MessagingHandler
 +      connection, session, link endpointDelegator
 +      flowcontroller            EventHandler
 +
 +      // AutoSettle (default true) automatically pre-settle outgoing messages.
 +      AutoSettle bool
 +      // AutoAccept (default true) automatically accept and settle incoming 
messages
 +      // if they are not settled by the delegate.
 +      AutoAccept bool
 +      // AutoOpen (default true) automatically open remotely opened endpoints.
 +      AutoOpen bool
 +      // Prefetch (default 10) initial credit to issue for incoming links.
 +      Prefetch int
 +      // PeerCloseIsError (default false) if true a close by the peer will be 
treated as an error.
 +      PeerCloseError bool
 +}
 +
 +func NewMessagingAdapter(h MessagingHandler) *MessagingAdapter {
 +      return &MessagingAdapter{
 +              mhandler:       h,
 +              flowcontroller: nil,
 +              AutoSettle:     true,
 +              AutoAccept:     true,
 +              AutoOpen:       true,
 +              Prefetch:       10,
 +              PeerCloseError: false,
 +      }
 +}
 +
 +func handleIf(h EventHandler, e Event) {
 +      if h != nil {
 +              h.HandleEvent(e)
 +      }
 +}
 +
 +// Handle a proton event by passing the corresponding MessagingEvent(s) to
 +// the MessagingHandler.
 +func (d *MessagingAdapter) HandleEvent(e Event) {
 +      handleIf(d.flowcontroller, e)
 +
 +      switch e.Type() {
 +
 +      case EConnectionInit:
 +              d.connection = endpointDelegator{
 +                      EConnectionRemoteOpen, EConnectionRemoteClose, 
EConnectionLocalOpen, EConnectionLocalClose,
 +                      MConnectionOpening, MConnectionOpened, 
MConnectionClosing, MConnectionClosed,
 +                      MConnectionError,
 +                      func(e Event) Endpoint { return e.Connection() },
 +                      d,
 +              }
 +              d.session = endpointDelegator{
 +                      ESessionRemoteOpen, ESessionRemoteClose, 
ESessionLocalOpen, ESessionLocalClose,
 +                      MSessionOpening, MSessionOpened, MSessionClosing, 
MSessionClosed,
 +                      MSessionError,
 +                      func(e Event) Endpoint { return e.Session() },
 +                      d,
 +              }
 +              d.link = endpointDelegator{
 +                      ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, 
ELinkLocalClose,
 +                      MLinkOpening, MLinkOpened, MLinkClosing, MLinkClosed,
 +                      MLinkError,
 +                      func(e Event) Endpoint { return e.Link() },
 +                      d,
 +              }
 +              if d.Prefetch > 0 {
 +                      d.flowcontroller = flowcontroller{window: d.Prefetch, 
drained: 0}
 +              }
 +              d.mhandler.HandleMessagingEvent(MStart, e)
 +
 +      case EConnectionRemoteOpen:
 +
 +              d.connection.HandleEvent(e)
 +
 +      case EConnectionRemoteClose:
 +              d.connection.HandleEvent(e)
 +              e.Connection().Transport().CloseTail()
 +
 +      case EConnectionLocalOpen, EConnectionLocalClose:
 +              d.connection.HandleEvent(e)
 +
 +      case ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, 
ESessionLocalClose:
 +              d.session.HandleEvent(e)
 +
 +      case ELinkRemoteOpen:
 +              e.Link().Source().Copy(e.Link().RemoteSource())
 +              e.Link().Target().Copy(e.Link().RemoteTarget())
 +              d.link.HandleEvent(e)
 +
 +      case ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose:
 +              d.link.HandleEvent(e)
 +
 +      case ELinkFlow:
 +              if e.Link().IsSender() && e.Link().Credit() > 0 {
 +                      d.mhandler.HandleMessagingEvent(MSendable, e)
 +              }
 +
 +      case EDelivery:
 +              if e.Delivery().Link().IsReceiver() {
 +                      d.incoming(e)
 +              } else {
 +                      d.outgoing(e)
 +              }
 +
 +      case ETransportClosed:
 +              d.mhandler.HandleMessagingEvent(MDisconnected, e)
 +      }
 +}
 +
 +func (d *MessagingAdapter) incoming(e Event) {
 +      delivery := e.Delivery()
 +      if delivery.HasMessage() {
 +              d.mhandler.HandleMessagingEvent(MMessage, e)
 +              if d.AutoAccept && !delivery.Settled() {
 +                      delivery.Accept()
 +              }
 +              if delivery.Current() {
 +                      e.Link().Advance()
 +              }
 +      } else if delivery.Updated() && delivery.Settled() {
 +              d.mhandler.HandleMessagingEvent(MSettled, e)
 +      }
 +      return
 +}
 +
 +func (d *MessagingAdapter) outgoing(e Event) {
 +      delivery := e.Delivery()
 +      if delivery.Updated() {
 +              switch delivery.Remote().Type() {
 +              case Accepted:
 +                      d.mhandler.HandleMessagingEvent(MAccepted, e)
 +              case Rejected:
 +                      d.mhandler.HandleMessagingEvent(MRejected, e)
 +              case Released, Modified:
 +                      d.mhandler.HandleMessagingEvent(MReleased, e)
 +              }
 +              if delivery.Settled() {
 +                      // The delivery was settled remotely, inform the local 
end.
 +                      d.mhandler.HandleMessagingEvent(MSettled, e)
 +              }
 +              if d.AutoSettle {
 +                      delivery.Settle() // Local settle, don't mhandler 
MSettled till the remote end settles.
 +              }
 +      }
 +      return
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/message.go
----------------------------------------------------------------------
diff --cc proton/message.go
index 2336483,0000000..fbb1d48
mode 100644,000000..100644
--- a/proton/message.go
+++ b/proton/message.go
@@@ -1,93 -1,0 +1,93 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package proton
 +
 +// #include <proton/types.h>
 +// #include <proton/message.h>
 +// #include <proton/codec.h>
 +import "C"
 +
 +import (
 +      "fmt"
 +      "qpid.apache.org/amqp"
 +      "strconv"
 +      "sync/atomic"
 +)
 +
 +// HasMessage is true if all message data is available.
 +// Equivalent to !d.isNil && d.Readable() && !d.Partial()
 +func (d Delivery) HasMessage() bool { return !d.IsNil() && d.Readable() && 
!d.Partial() }
 +
- // Message decodes the message containined in a delivery.
++// Message decodes the message contained in a delivery.
 +//
 +// Must be called in the correct link context with this delivery as the 
current message,
 +// handling an MMessage event is always a safe context to call this function.
 +//
 +// Will return an error if message is incomplete or not current.
 +func (delivery Delivery) Message() (m amqp.Message, err error) {
 +      if !delivery.Readable() {
 +              return nil, fmt.Errorf("delivery is not readable")
 +      }
 +      if delivery.Partial() {
 +              return nil, fmt.Errorf("delivery has partial message")
 +      }
 +      data := make([]byte, delivery.Pending())
 +      result := delivery.Link().Recv(data)
 +      if result != len(data) {
 +              return nil, fmt.Errorf("cannot receive message: %s", 
PnErrorCode(result))
 +      }
 +      m = amqp.NewMessage()
 +      err = m.Decode(data)
 +      return
 +}
 +
 +// Process-wide atomic counter for generating tag names
 +var tagCounter uint64
 +
 +func nextTag() string {
 +      return strconv.FormatUint(atomic.AddUint64(&tagCounter, 1), 32)
 +}
 +
 +// Send sends a amqp.Message over a Link.
 +// Returns a Delivery that can be use to determine the outcome of the message.
 +func (link Link) Send(m amqp.Message) (Delivery, error) {
 +      if !link.IsSender() {
 +              return Delivery{}, fmt.Errorf("attempt to send message on 
receiving link")
 +      }
 +
 +      delivery := link.Delivery(nextTag())
 +      bytes, err := m.Encode(nil)
 +      if err != nil {
-               return Delivery{}, fmt.Errorf("cannot send mesage %s", err)
++              return Delivery{}, fmt.Errorf("cannot send message %s", err)
 +      }
 +      result := link.SendBytes(bytes)
 +      link.Advance()
 +      if result != len(bytes) {
 +              if result < 0 {
 +                      return delivery, fmt.Errorf("send failed %v", 
PnErrorCode(result))
 +              } else {
 +                      return delivery, fmt.Errorf("send incomplete %v of %v", 
result, len(bytes))
 +              }
 +      }
 +      if link.RemoteSndSettleMode() == SndSettled {
 +              delivery.Settle()
 +      }
 +      return delivery, nil
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/wrappers.go
----------------------------------------------------------------------
diff --cc proton/wrappers.go
index 09f3e65,0000000..a7b7fb2
mode 100644,000000..100644
--- a/proton/wrappers.go
+++ b/proton/wrappers.go
@@@ -1,460 -1,0 +1,460 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +// This file contains special-case wrapper functions or wrappers that don't 
follow
 +// the pattern of genwrap.go.
 +
 +package proton
 +
 +//#include <proton/codec.h>
 +//#include <proton/connection.h>
 +//#include <proton/delivery.h>
 +//#include <proton/event.h>
 +//#include <proton/link.h>
 +//#include <proton/link.h>
 +//#include <proton/object.h>
 +//#include <proton/sasl.h>
 +//#include <proton/session.h>
 +//#include <proton/transport.h>
 +//#include <stdlib.h>
 +import "C"
 +
 +import (
 +      "fmt"
 +      "qpid.apache.org/amqp"
 +      "reflect"
 +      "time"
 +      "unsafe"
 +)
 +
 +// TODO aconway 2015-05-05: Documentation for generated types.
 +
 +// CHandle holds an unsafe.Pointer to a proton C struct, the C type depends 
on the
 +// Go type implementing this interface. For low level, at-your-own-risk use 
only.
 +type CHandle interface {
 +      // CPtr returns the unsafe C pointer, equivalent to a C void*.
 +      CPtr() unsafe.Pointer
 +}
 +
 +// Incref increases the refcount of a proton value, which prevents the
 +// underlying C struct being freed until you call Decref().
 +//
 +// It can be useful to "pin" a proton value in memory while it is in use by
 +// goroutines other than the event loop goroutine. For example if you 
Incref() a
 +// Link, the underlying object is not freed when the link is closed, so means
 +// other goroutines can continue to safely use it as an index in a map or 
inject
 +// it into the event loop goroutine. There will of course be an error if you 
try
 +// to use a link after it is closed, but not a segmentation fault.
 +func Incref(c CHandle) {
 +      if p := c.CPtr(); p != nil {
 +              C.pn_incref(p)
 +      }
 +}
 +
 +// Decref decreases the refcount of a proton value, freeing the underlying C
 +// struct if this is the last reference.  Only call this if you previously
 +// called Incref() for this value.
 +func Decref(c CHandle) {
 +      if p := c.CPtr(); p != nil {
 +              C.pn_decref(p)
 +      }
 +}
 +
 +// Event is an AMQP protocol event.
 +type Event struct {
 +      pn         *C.pn_event_t
 +      eventType  EventType
 +      connection Connection
 +      transport  Transport
 +      session    Session
 +      link       Link
 +      delivery   Delivery
 +      injecter   Injecter
 +}
 +
 +func makeEvent(pn *C.pn_event_t, injecter Injecter) Event {
 +      return Event{
 +              pn:         pn,
 +              eventType:  EventType(C.pn_event_type(pn)),
 +              connection: Connection{C.pn_event_connection(pn)},
 +              transport:  Transport{C.pn_event_transport(pn)},
 +              session:    Session{C.pn_event_session(pn)},
 +              link:       Link{C.pn_event_link(pn)},
 +              delivery:   Delivery{C.pn_event_delivery(pn)},
 +              injecter:   injecter,
 +      }
 +}
 +func (e Event) IsNil() bool            { return e.eventType == EventType(0) }
 +func (e Event) Type() EventType        { return e.eventType }
 +func (e Event) Connection() Connection { return e.connection }
 +func (e Event) Transport() Transport   { return e.transport }
 +func (e Event) Session() Session       { return e.session }
 +func (e Event) Link() Link             { return e.link }
 +func (e Event) Delivery() Delivery     { return e.delivery }
 +func (e Event) String() string         { return e.Type().String() }
 +
 +// Injecter should not be used in a handler function, but it can be passed to
 +// other goroutines (via a channel or to a goroutine started by handler
 +// functions) to let them inject functions back into the handlers goroutine.
 +func (e Event) Injecter() Injecter { return e.injecter }
 +
 +// Data is an intermediate form of decoded AMQP data.
 +type Data struct{ pn *C.pn_data_t }
 +
 +func (d Data) Free()                { C.pn_data_free(d.pn) }
 +func (d Data) CPtr() unsafe.Pointer { return unsafe.Pointer(d.pn) }
 +func (d Data) Clear()               { C.pn_data_clear(d.pn) }
 +func (d Data) Rewind()              { C.pn_data_rewind(d.pn) }
 +func (d Data) Next()                { C.pn_data_next(d.pn) }
 +func (d Data) Error() error         { return PnError(C.pn_data_error(d.pn)) }
 +func (d Data) Empty() bool          { return C.pn_data_size(d.pn) == 0 }
 +
 +func (d Data) String() string {
 +      str := C.pn_string(C.CString(""))
 +      defer C.pn_free(unsafe.Pointer(str))
 +      C.pn_inspect(unsafe.Pointer(d.pn), str)
 +      return C.GoString(C.pn_string_get(str))
 +}
 +
 +// Unmarshal the value of d into value pointed at by ptr, see 
amqp.Unmarshal() for details
 +func (d Data) Unmarshal(ptr interface{}) error {
 +      d.Rewind()
 +      d.Next()
 +      err := amqp.UnmarshalUnsafe(d.CPtr(), ptr)
 +      return err
 +}
 +
 +// Marshal the value v into d, see amqp.Marshal() for details
 +func (d Data) Marshal(v interface{}) error {
 +      d.Clear()
 +      return amqp.MarshalUnsafe(v, d.CPtr())
 +}
 +
 +// State holds the state flags for an AMQP endpoint.
 +type State byte
 +
 +const (
 +      SLocalUninit  State = C.PN_LOCAL_UNINIT
 +      SLocalActive        = C.PN_LOCAL_ACTIVE
 +      SLocalClosed        = C.PN_LOCAL_CLOSED
 +      SRemoteUninit       = C.PN_REMOTE_UNINIT
 +      SRemoteActive       = C.PN_REMOTE_ACTIVE
 +      SRemoteClosed       = C.PN_REMOTE_CLOSED
 +)
 +
 +// Has is True if bits & state is non 0.
 +func (s State) Has(bits State) bool { return s&bits != 0 }
 +
 +func (s State) LocalUninit() bool  { return s.Has(SLocalUninit) }
 +func (s State) LocalActive() bool  { return s.Has(SLocalActive) }
 +func (s State) LocalClosed() bool  { return s.Has(SLocalClosed) }
 +func (s State) RemoteUninit() bool { return s.Has(SRemoteUninit) }
 +func (s State) RemoteActive() bool { return s.Has(SRemoteActive) }
 +func (s State) RemoteClosed() bool { return s.Has(SRemoteClosed) }
 +
- // Return a State containig just the local flags
++// Return a State containing just the local flags
 +func (s State) Local() State { return State(s & C.PN_LOCAL_MASK) }
 +
- // Return a State containig just the remote flags
++// Return a State containing just the remote flags
 +func (s State) Remote() State { return State(s & C.PN_REMOTE_MASK) }
 +
 +// Endpoint is the common interface for Connection, Link and Session.
 +type Endpoint interface {
 +      // State is the open/closed state.
 +      State() State
 +      // Open an endpoint.
 +      Open()
 +      // Close an endpoint.
 +      Close()
 +      // Condition holds a local error condition.
 +      Condition() Condition
 +      // RemoteCondition holds a remote error condition.
 +      RemoteCondition() Condition
 +      // Human readable name
 +      String() string
 +      // Human readable endpoint type "sender-link", "session" etc.
 +      Type() string
 +}
 +
 +// CloseError sets an error condition (if err != nil) on an endpoint and 
closes
 +// the endpoint if not already closed
 +func CloseError(e Endpoint, err error) {
 +      if err != nil && !e.Condition().IsSet() {
 +              e.Condition().SetError(err)
 +      }
 +      e.Close()
 +}
 +
 +// EndpointError returns the remote error if there is one, the local error if 
not
 +// nil if there is no error.
 +func EndpointError(e Endpoint) error {
 +      err := e.RemoteCondition().Error()
 +      if err == nil {
 +              err = e.Condition().Error()
 +      }
 +      return err
 +}
 +
 +const (
 +      Received uint64 = C.PN_RECEIVED
 +      Accepted        = C.PN_ACCEPTED
 +      Rejected        = C.PN_REJECTED
 +      Released        = C.PN_RELEASED
 +      Modified        = C.PN_MODIFIED
 +)
 +
 +// SettleAs is equivalent to d.Update(disposition); d.Settle()
 +func (d Delivery) SettleAs(disposition uint64) {
 +      d.Update(disposition)
 +      d.Settle()
 +}
 +
 +// Accept accepts and settles a delivery.
 +func (d Delivery) Accept() { d.SettleAs(Accepted) }
 +
 +// Reject rejects and settles a delivery
 +func (d Delivery) Reject() { d.SettleAs(Rejected) }
 +
 +// Release releases and settles a delivery
 +// If delivered is true the delivery count for the message will be increased.
 +func (d Delivery) Release(delivered bool) {
 +      if delivered {
 +              d.SettleAs(Modified)
 +      } else {
 +              d.SettleAs(Released)
 +      }
 +}
 +
 +type DeliveryTag struct{ pn C.pn_delivery_tag_t }
 +
 +func (t DeliveryTag) String() string { return C.GoStringN(t.pn.start, 
C.int(t.pn.size)) }
 +
 +func (l Link) Recv(buf []byte) int {
 +      if len(buf) == 0 {
 +              return 0
 +      }
 +      return int(C.pn_link_recv(l.pn, (*C.char)(unsafe.Pointer(&buf[0])), 
C.size_t(len(buf))))
 +}
 +
 +func (l Link) SendBytes(bytes []byte) int {
 +      return int(C.pn_link_send(l.pn, cPtr(bytes), cLen(bytes)))
 +}
 +
 +func pnTag(tag string) C.pn_delivery_tag_t {
 +      bytes := []byte(tag)
 +      return C.pn_dtag(cPtr(bytes), cLen(bytes))
 +}
 +
 +func (l Link) Delivery(tag string) Delivery {
 +      return Delivery{C.pn_delivery(l.pn, pnTag(tag))}
 +}
 +
 +func (l Link) Connection() Connection { return l.Session().Connection() }
 +
 +// Human-readable link description including name, source, target and 
direction.
 +func (l Link) String() string {
 +      switch {
 +      case l.IsNil():
 +              return fmt.Sprintf("<nil-link>")
 +      case l.IsSender():
 +              return fmt.Sprintf("%s(%s->%s)", l.Name(), 
l.Source().Address(), l.Target().Address())
 +      default:
 +              return fmt.Sprintf("%s(%s<-%s)", l.Name(), 
l.Target().Address(), l.Source().Address())
 +      }
 +}
 +
 +func (l Link) Type() string {
 +      if l.IsSender() {
 +              return "sender-link"
 +      } else {
 +              return "receiver-link"
 +      }
 +}
 +
 +// IsDrain calls pn_link_get_drain(), it conflicts with pn_link_drain() under 
the normal mapping.
 +func (l Link) IsDrain() bool {
 +      return bool(C.pn_link_get_drain(l.pn))
 +}
 +
 +func cPtr(b []byte) *C.char {
 +      if len(b) == 0 {
 +              return nil
 +      }
 +      return (*C.char)(unsafe.Pointer(&b[0]))
 +}
 +
 +func cLen(b []byte) C.size_t {
 +      return C.size_t(len(b))
 +}
 +
 +func (s Session) Sender(name string) Link {
 +      cname := C.CString(name)
 +      defer C.free(unsafe.Pointer(cname))
 +      return Link{C.pn_sender(s.pn, cname)}
 +}
 +
 +func (s Session) Receiver(name string) Link {
 +      cname := C.CString(name)
 +      defer C.free(unsafe.Pointer(cname))
 +      return Link{C.pn_receiver(s.pn, cname)}
 +}
 +
 +func (t Transport) String() string {
 +      return fmt.Sprintf("(Transport)(%p)", t.CPtr())
 +}
 +
 +// Unique (per process) string identifier for a connection, useful for 
debugging.
 +func (c Connection) String() string {
 +      // Use the transport address to match the default transport logs from 
PN_TRACE.
 +      return fmt.Sprintf("(Connection)(%p)", c.Transport().CPtr())
 +}
 +
 +func (c Connection) Type() string {
 +      return "connection"
 +}
 +
 +// Head functions don't follow the normal naming conventions so missed by the 
generator.
 +
 +func (c Connection) LinkHead(s State) Link {
 +      return Link{C.pn_link_head(c.pn, C.pn_state_t(s))}
 +}
 +
 +func (c Connection) SessionHead(s State) Session {
 +      return Session{C.pn_session_head(c.pn, C.pn_state_t(s))}
 +}
 +
 +func (c Connection) Links(state State) (links []Link) {
 +      for l := c.LinkHead(state); !l.IsNil(); l = l.Next(state) {
 +              links = append(links, l)
 +      }
 +      return
 +}
 +
 +func (c Connection) Sessions(state State) (sessions []Session) {
 +      for s := c.SessionHead(state); !s.IsNil(); s = s.Next(state) {
 +              sessions = append(sessions, s)
 +      }
 +      return
 +}
 +
 +// SetPassword takes []byte not string because it is impossible to erase a 
string
 +// from memory reliably. Proton will not keep the password in memory longer 
than
 +// needed, the caller should overwrite their copy on return.
 +//
 +// The password must not contain embedded nul characters, a trailing nul is 
ignored.
 +func (c Connection) SetPassword(password []byte) {
 +      if len(password) == 0 || password[len(password)-1] != 0 {
 +              password = append(password, 0) // Proton requires a terminating 
null.
 +      }
 +      C.pn_connection_set_password(c.pn, 
(*C.char)(unsafe.Pointer(&password[0])))
 +}
 +
 +func (s Session) String() string {
 +      return fmt.Sprintf("(Session)(%p)", s.pn) // TODO aconway 2016-09-12: 
should print channel number.
 +}
 +
 +func (s Session) Type() string { return "session" }
 +
 +// Error returns an instance of amqp.Error or nil.
 +func (c Condition) Error() error {
 +      if c.IsNil() || !c.IsSet() {
 +              return nil
 +      }
 +      return amqp.Error{Name: c.Name(), Description: c.Description()}
 +}
 +
 +// Set a Go error into a condition.
 +// If it is not an amqp.Condition use the error type as name, error string as 
description.
 +func (c Condition) SetError(err error) {
 +      if err != nil {
 +              if cond, ok := err.(amqp.Error); ok {
 +                      c.SetName(cond.Name)
 +                      c.SetDescription(cond.Description)
 +              } else {
 +                      c.SetName(reflect.TypeOf(err).Name())
 +                      c.SetDescription(err.Error())
 +              }
 +      }
 +}
 +
 +func (c Connection) Session() (Session, error) {
 +      s := Session{C.pn_session(c.pn)}
 +      if s.IsNil() {
 +              return s, Connection(c).Error()
 +      }
 +      return s, nil
 +}
 +
 +// pnTime converts Go time.Time to Proton millisecond Unix time.
 +//
 +// Note: t.isZero() is converted to C.pn_timestamp_t(0) and vice-versa. These
 +// are used as "not set" sentinel values by the Go and Proton APIs, so it is
 +// better to conserve the "zeroness" even though they don't represent the same
 +// time instant.
 +//
 +func pnTime(t time.Time) (pnt C.pn_timestamp_t) {
 +      if !t.IsZero() {
 +              pnt = C.pn_timestamp_t(t.Unix()*1000 + 
int64(t.Nanosecond())/int64(time.Millisecond))
 +      }
 +      return
 +}
 +
 +// goTime converts a pn_timestamp_t to a Go time.Time.
 +//
 +// Note: C.pn_timestamp_t(0) is converted to a zero time.Time and
 +// vice-versa. These are used as "not set" sentinel values by the Go and 
Proton
 +// APIs, so it is better to conserve the "zeroness" even though they don't
 +// represent the same time instant.
 +//
 +func goTime(pnt C.pn_timestamp_t) (t time.Time) {
 +      if pnt != 0 {
 +              t = time.Unix(int64(pnt/1000), 
int64(pnt%1000)*int64(time.Millisecond))
 +      }
 +      return
 +}
 +
 +// Special treatment for Transport.Head, return value is unsafe.Pointer not 
string
 +func (t Transport) Head() unsafe.Pointer {
 +      return unsafe.Pointer(C.pn_transport_head(t.pn))
 +}
 +
 +// Special treatment for Transport.Tail, return value is unsafe.Pointer not 
string
 +func (t Transport) Tail() unsafe.Pointer {
 +      return unsafe.Pointer(C.pn_transport_tail(t.pn))
 +}
 +
 +// Special treatment for Transport.Push, takes []byte instead of char*, size
 +func (t Transport) Push(bytes []byte) int {
 +      return int(C.pn_transport_push(t.pn, 
(*C.char)(unsafe.Pointer(&bytes[0])), C.size_t(len(bytes))))
 +}
 +
 +// Get the SASL object for the transport.
 +func (t Transport) SASL() SASL {
 +      return SASL{C.pn_sasl(t.pn)}
 +}
 +
 +// Do we support extended SASL negotiation?
 +// All implementations of Proton support ANONYMOUS and EXTERNAL on both
 +// client and server sides and PLAIN on the client side.
 +//
 +// Extended SASL implememtations use an external library (Cyrus SASL)
 +// to support other mechanisms beyond these basic ones.
 +func SASLExtended() bool {
 +      return bool(C.pn_sasl_extended())
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to