Merge branch 'master' into go1

# Conflicts:
#       readme-branch.md


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9e788b2d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9e788b2d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9e788b2d

Branch: refs/heads/go1
Commit: 9e788b2d484a79154d9d05a93bd6447730ab7738
Parents: e1a83ee 1cfa056
Author: Alan Conway <[email protected]>
Authored: Mon Nov 23 12:36:33 2015 -0500
Committer: Alan Conway <[email protected]>
Committed: Mon Nov 23 12:52:44 2015 -0500

----------------------------------------------------------------------
 amqp/error.go                |  39 ++-
 amqp/interop                 |   1 -
 amqp/marshal.go              |  10 +-
 amqp/message.go              |   9 +-
 amqp/types.go                |   6 +-
 amqp/unmarshal.go            |  41 +--
 amqp/url.go                  |   6 +-
 amqp/url_test.go             |   2 +-
 electron/connection.go       | 128 ++++++----
 electron/container.go        |  20 +-
 electron/doc.go              |  14 +-
 electron/endpoint.go         |  36 ++-
 electron/error.go            |  35 +++
 electron/handler.go          |  97 ++++---
 electron/link.go             |  86 ++++---
 electron/messaging_test.go   | 182 +++++++-------
 electron/receiver.go         |  60 +++--
 electron/sender.go           | 375 ++++++++++++---------------
 electron/session.go          |  73 +++---
 electron/time.go             |  13 +-
 internal/error.go            | 118 ---------
 internal/flexchannel.go      |  82 ------
 internal/flexchannel_test.go |  89 -------
 internal/safemap.go          |  57 -----
 internal/uuid.go             |  70 ------
 proton/engine.go             |  27 +-
 proton/error.go              | 104 ++++----
 proton/handlers.go           |   7 +-
 proton/interop_test.go       | 290 ---------------------
 proton/marshal.go            | 210 ----------------
 proton/message.go            |  29 ++-
 proton/types.go              | 151 -----------
 proton/unfinished.go         |  53 ----
 proton/unmarshal.go          | 517 --------------------------------------
 proton/url.go                |  96 -------
 proton/url_test.go           |  51 ----
 proton/uuid.go               |  57 +++++
 proton/wrappers.go           |  39 +--
 proton/wrappers_gen.go       |   9 +-
 readme-go-get.md             |  18 ++
 40 files changed, 873 insertions(+), 2434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/error.go
----------------------------------------------------------------------
diff --cc amqp/error.go
index 868dbf3,0000000..349fc41
mode 100644,000000..100644
--- a/amqp/error.go
+++ b/amqp/error.go
@@@ -1,66 -1,0 +1,103 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
++// #include <proton/error.h>
++import "C"
++
 +import (
 +      "fmt"
 +      "reflect"
 +)
 +
 +// Error is an AMQP error condition. It has a name and a description.
 +// It implements the Go error interface so can be returned as an error value.
 +//
 +// You can pass amqp.Error to methods that pass an error to a remote endpoint,
 +// this gives you full control over what the remote endpoint will see.
 +//
 +// You can also pass any Go error to such functions, the remote peer
 +// will see the equivalent of MakeError(error)
 +//
 +type Error struct{ Name, Description string }
 +
 +// Error implements the Go error interface for AMQP error errors.
- func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, 
c.Description) }
++func (c Error) Error() string { return fmt.Sprintf("%s: %s", c.Name, 
c.Description) }
 +
 +// Errorf makes a Error with name and formatted description as per fmt.Sprintf
 +func Errorf(name, format string, arg ...interface{}) Error {
 +      return Error{name, fmt.Sprintf(format, arg...)}
 +}
 +
 +// MakeError makes an AMQP error from a go error using the Go error type as 
the name
 +// and the err.Error() string as the description.
 +func MakeError(err error) Error {
 +      return Error{reflect.TypeOf(err).Name(), err.Error()}
 +}
 +
 +var (
 +      InternalError      = "amqp:internal-error"
 +      NotFound           = "amqp:not-found"
 +      UnauthorizedAccess = "amqp:unauthorized-access"
 +      DecodeError        = "amqp:decode-error"
 +      ResourceLimit      = "amqp:resource-limit"
 +      NotAllowed         = "amqp:not-allowed"
 +      InvalidField       = "amqp:invalid-field"
 +      NotImplemented     = "amqp:not-implemented"
 +      ResourceLocked     = "amqp:resource-locked"
 +      PreerrorFailed     = "amqp:preerror-failed"
 +      ResourceDeleted    = "amqp:resource-deleted"
 +      IllegalState       = "amqp:illegal-state"
 +      FrameSizeTooSmall  = "amqp:frame-size-too-small"
 +)
++
++type PnErrorCode int
++
++func (e PnErrorCode) String() string {
++      switch e {
++      case C.PN_EOS:
++              return "end-of-data"
++      case C.PN_ERR:
++              return "error"
++      case C.PN_OVERFLOW:
++              return "overflow"
++      case C.PN_UNDERFLOW:
++              return "underflow"
++      case C.PN_STATE_ERR:
++              return "bad-state"
++      case C.PN_ARG_ERR:
++              return "invalid-argument"
++      case C.PN_TIMEOUT:
++              return "timeout"
++      case C.PN_INTR:
++              return "interrupted"
++      case C.PN_INPROGRESS:
++              return "in-progress"
++      default:
++              return fmt.Sprintf("unknown-error(%d)", e)
++      }
++}
++
++func PnError(e *C.pn_error_t) error {
++      if e == nil || C.pn_error_code(e) == 0 {
++              return nil
++      }
++      return fmt.Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), 
C.GoString(C.pn_error_text(e)))
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/marshal.go
----------------------------------------------------------------------
diff --cc amqp/marshal.go
index 666b4f6,0000000..9930e13
mode 100644,000000..100644
--- a/amqp/marshal.go
+++ b/amqp/marshal.go
@@@ -1,250 -1,0 +1,250 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +// #include <proton/codec.h>
 +import "C"
 +
 +import (
++      "fmt"
 +      "io"
-       "qpid.apache.org/internal"
 +      "reflect"
 +      "unsafe"
 +)
 +
 +func dataError(prefix string, data *C.pn_data_t) error {
-       err := internal.PnError(unsafe.Pointer(C.pn_data_error(data)))
++      err := PnError(C.pn_data_error(data))
 +      if err != nil {
-               err = internal.Errorf("%s: %s", prefix, err.(internal.Error))
++              err = fmt.Errorf("%s: %s", prefix, err.Error())
 +      }
 +      return err
 +}
 +
 +/*
 +Marshal encodes a Go value as AMQP data in buffer.
 +If buffer is nil, or is not large enough, a new buffer  is created.
 +
 +Returns the buffer used for encoding with len() adjusted to the actual size 
of data.
 +
 +Go types are encoded as follows
 +
 + 
+-------------------------------------+--------------------------------------------+
 + |Go type                              |AMQP type                             
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |bool                                 |bool                                  
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |int8, int16, int32, int64 (int)      |byte, short, int, long (int or long)  
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or 
ulong)  |
 + 
+-------------------------------------+--------------------------------------------+
 + |float32, float64                     |float, double.                        
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |string                               |string                                
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |[]byte, Binary                       |binary                                
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |Symbol                               |symbol                                
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |interface{}                          |the contained type                    
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |nil                                  |null                                  
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |map[K]T                              |map with K and T converted as above   
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |Map                                  |map, may have mixed types for keys, 
values  |
 + 
+-------------------------------------+--------------------------------------------+
 + |[]T                                  |list with T converted as above        
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |List                                 |list, may have mixed types  values    
      |
 + 
+-------------------------------------+--------------------------------------------+
 +
 +The following Go types cannot be marshaled: uintptr, function, interface, 
channel
 +
 +TODO
 +
 +Go types: array, slice, struct, complex64/128.
 +
 +AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section 
message bodies.
 +
 +Described types.
 +
 +*/
 +func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
 +      defer doRecover(&err)
 +      data := C.pn_data(0)
 +      defer C.pn_data_free(data)
 +      marshal(v, data)
 +      encode := func(buf []byte) ([]byte, error) {
 +              n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
 +              switch {
 +              case n == int(C.PN_OVERFLOW):
 +                      return buf, overflow
 +              case n < 0:
 +                      return buf, dataError("marshal error", data)
 +              default:
 +                      return buf[:n], nil
 +              }
 +      }
 +      return encodeGrow(buffer, encode)
 +}
 +
 +const minEncode = 256
 +
 +// overflow is returned when an encoding function can't fit data in the 
buffer.
- var overflow = internal.Errorf("buffer too small")
++var overflow = fmt.Errorf("buffer too small")
 +
 +// encodeFn encodes into buffer[0:len(buffer)].
 +// Returns buffer with length adjusted for data encoded.
 +// If buffer too small, returns overflow as error.
 +type encodeFn func(buffer []byte) ([]byte, error)
 +
 +// encodeGrow calls encode() into buffer, if it returns overflow grows the 
buffer.
 +// Returns the final buffer.
 +func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
 +      if buffer == nil || len(buffer) == 0 {
 +              buffer = make([]byte, minEncode)
 +      }
 +      var err error
 +      for buffer, err = encode(buffer); err == overflow; buffer, err = 
encode(buffer) {
 +              buffer = make([]byte, 2*len(buffer))
 +      }
 +      return buffer, err
 +}
 +
 +func marshal(v interface{}, data *C.pn_data_t) {
 +      switch v := v.(type) {
 +      case nil:
 +              C.pn_data_put_null(data)
 +      case bool:
 +              C.pn_data_put_bool(data, C.bool(v))
 +      case int8:
 +              C.pn_data_put_byte(data, C.int8_t(v))
 +      case int16:
 +              C.pn_data_put_short(data, C.int16_t(v))
 +      case int32:
 +              C.pn_data_put_int(data, C.int32_t(v))
 +      case int64:
 +              C.pn_data_put_long(data, C.int64_t(v))
 +      case int:
 +              if unsafe.Sizeof(0) == 8 {
 +                      C.pn_data_put_long(data, C.int64_t(v))
 +              } else {
 +                      C.pn_data_put_int(data, C.int32_t(v))
 +              }
 +      case uint8:
 +              C.pn_data_put_ubyte(data, C.uint8_t(v))
 +      case uint16:
 +              C.pn_data_put_ushort(data, C.uint16_t(v))
 +      case uint32:
 +              C.pn_data_put_uint(data, C.uint32_t(v))
 +      case uint64:
 +              C.pn_data_put_ulong(data, C.uint64_t(v))
 +      case uint:
 +              if unsafe.Sizeof(0) == 8 {
 +                      C.pn_data_put_ulong(data, C.uint64_t(v))
 +              } else {
 +                      C.pn_data_put_uint(data, C.uint32_t(v))
 +              }
 +      case float32:
 +              C.pn_data_put_float(data, C.float(v))
 +      case float64:
 +              C.pn_data_put_double(data, C.double(v))
 +      case string:
 +              C.pn_data_put_string(data, pnBytes([]byte(v)))
 +      case []byte:
 +              C.pn_data_put_binary(data, pnBytes(v))
 +      case Binary:
 +              C.pn_data_put_binary(data, pnBytes([]byte(v)))
 +      case Symbol:
 +              C.pn_data_put_symbol(data, pnBytes([]byte(v)))
 +      case Map: // Special map type
 +              C.pn_data_put_map(data)
 +              C.pn_data_enter(data)
 +              for key, val := range v {
 +                      marshal(key, data)
 +                      marshal(val, data)
 +              }
 +              C.pn_data_exit(data)
 +      default:
 +              switch reflect.TypeOf(v).Kind() {
 +              case reflect.Map:
 +                      putMap(data, v)
 +              case reflect.Slice:
 +                      putList(data, v)
 +              default:
-                       panic(internal.Errorf("cannot marshal %s to AMQP", 
reflect.TypeOf(v)))
++                      panic(fmt.Errorf("cannot marshal %s to AMQP", 
reflect.TypeOf(v)))
 +              }
 +      }
 +      err := dataError("marshal", data)
 +      if err != nil {
 +              panic(err)
 +      }
 +      return
 +}
 +
 +func clearMarshal(v interface{}, data *C.pn_data_t) {
 +      C.pn_data_clear(data)
 +      marshal(v, data)
 +}
 +
 +func putMap(data *C.pn_data_t, v interface{}) {
 +      mapValue := reflect.ValueOf(v)
 +      C.pn_data_put_map(data)
 +      C.pn_data_enter(data)
 +      for _, key := range mapValue.MapKeys() {
 +              marshal(key.Interface(), data)
 +              marshal(mapValue.MapIndex(key).Interface(), data)
 +      }
 +      C.pn_data_exit(data)
 +}
 +
 +func putList(data *C.pn_data_t, v interface{}) {
 +      listValue := reflect.ValueOf(v)
 +      C.pn_data_put_list(data)
 +      C.pn_data_enter(data)
 +      for i := 0; i < listValue.Len(); i++ {
 +              marshal(listValue.Index(i).Interface(), data)
 +      }
 +      C.pn_data_exit(data)
 +}
 +
 +// Encoder encodes AMQP values to an io.Writer
 +type Encoder struct {
 +      writer io.Writer
 +      buffer []byte
 +}
 +
 +// New encoder returns a new encoder that writes to w.
 +func NewEncoder(w io.Writer) *Encoder {
 +      return &Encoder{w, make([]byte, minEncode)}
 +}
 +
 +func (e *Encoder) Encode(v interface{}) (err error) {
 +      e.buffer, err = Marshal(v, e.buffer)
 +      if err == nil {
 +              e.writer.Write(e.buffer)
 +      }
 +      return err
 +}
 +
 +func replace(data *C.pn_data_t, v interface{}) {
 +      C.pn_data_clear(data)
 +      marshal(v, data)
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/message.go
----------------------------------------------------------------------
diff --cc amqp/message.go
index 5ba4f4f,0000000..e36c6f2
mode 100644,000000..100644
--- a/amqp/message.go
+++ b/amqp/message.go
@@@ -1,347 -1,0 +1,346 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +// #include <proton/types.h>
 +// #include <proton/message.h>
 +// #include <proton/codec.h>
 +// #include <stdlib.h>
 +//
 +// /* Helper for setting message string fields */
 +// typedef int (*set_fn)(pn_message_t*, const char*);
 +// int msg_set_str(pn_message_t* m, char* s, set_fn set) {
 +//     int result = set(m, s);
 +//     free(s);
 +//     return result;
 +// }
 +//
 +import "C"
 +
 +import (
-       "qpid.apache.org/internal"
++      "fmt"
 +      "runtime"
 +      "time"
 +      "unsafe"
 +)
 +
 +// Message is the interface to an AMQP message.
 +type Message interface {
 +      // Durable indicates that any parties taking responsibility
 +      // for the message must durably store the content.
 +      Durable() bool
 +      SetDurable(bool)
 +
 +      // Priority impacts ordering guarantees. Within a
 +      // given ordered context, higher priority messages may jump ahead of
 +      // lower priority messages.
 +      Priority() uint8
 +      SetPriority(uint8)
 +
 +      // TTL or Time To Live, a message it may be dropped after this duration
 +      TTL() time.Duration
 +      SetTTL(time.Duration)
 +
 +      // FirstAcquirer indicates
 +      // that the recipient of the message is the first recipient to acquire
 +      // the message, i.e. there have been no failed delivery attempts to
 +      // other acquirers. Note that this does not mean the message has not
 +      // been delivered to, but not acquired, by other recipients.
 +      FirstAcquirer() bool
 +      SetFirstAcquirer(bool)
 +
 +      // DeliveryCount tracks how many attempts have been made to
 +      // delivery a message.
 +      DeliveryCount() uint32
 +      SetDeliveryCount(uint32)
 +
 +      // MessageId provides a unique identifier for a message.
 +      // it can be an a string, an unsigned long, a uuid or a
 +      // binary value.
 +      MessageId() interface{}
 +      SetMessageId(interface{})
 +
 +      UserId() string
 +      SetUserId(string)
 +
 +      Address() string
 +      SetAddress(string)
 +
 +      Subject() string
 +      SetSubject(string)
 +
 +      ReplyTo() string
 +      SetReplyTo(string)
 +
 +      // CorrelationId is set on correlated request and response messages. It 
can be
 +      // an a string, an unsigned long, a uuid or a binary value.
 +      CorrelationId() interface{}
 +      SetCorrelationId(interface{})
 +
 +      ContentType() string
 +      SetContentType(string)
 +
 +      ContentEncoding() string
 +      SetContentEncoding(string)
 +
 +      // ExpiryTime indicates an absoulte time when the message may be 
dropped.
 +      // A Zero time (i.e. t.isZero() == true) indicates a message never 
expires.
 +      ExpiryTime() time.Time
 +      SetExpiryTime(time.Time)
 +
 +      CreationTime() time.Time
 +      SetCreationTime(time.Time)
 +
 +      GroupId() string
 +      SetGroupId(string)
 +
 +      GroupSequence() int32
 +      SetGroupSequence(int32)
 +
 +      ReplyToGroupId() string
 +      SetReplyToGroupId(string)
 +
 +      // Instructions - AMQP delivery instructions.
 +      Instructions() map[string]interface{}
 +      SetInstructions(v map[string]interface{})
 +
 +      // Annotations - AMQP annotations.
 +      Annotations() map[string]interface{}
 +      SetAnnotations(v map[string]interface{})
 +
 +      // Properties - Application properties.
 +      Properties() map[string]interface{}
 +      SetProperties(v map[string]interface{})
 +
 +      // Inferred indicates how the message content
 +      // is encoded into AMQP sections. If inferred is true then binary and
 +      // list values in the body of the message will be encoded as AMQP DATA
 +      // and AMQP SEQUENCE sections, respectively. If inferred is false,
 +      // then all values in the body of the message will be encoded as AMQP
 +      // VALUE sections regardless of their type.
 +      Inferred() bool
 +      SetInferred(bool)
 +
 +      // Marshal a Go value into the message body. See amqp.Marshal() for 
details.
 +      Marshal(interface{})
 +
 +      // Unmarshal the message body into the value pointed to by v. See 
amqp.Unmarshal() for details.
 +      Unmarshal(interface{})
 +
 +      // Body value resulting from the default unmarshalling of message body 
as interface{}
 +      Body() interface{}
 +
 +      // Encode encodes the message as AMQP data. If buffer is non-nil and is 
large enough
 +      // the message is encoded into it, otherwise a new buffer is created.
 +      // Returns the buffer containing the message.
 +      Encode(buffer []byte) ([]byte, error)
 +
 +      // Decode data into this message. Overwrites an existing message 
content.
 +      Decode(buffer []byte) error
 +
 +      // Clear the message contents.
 +      Clear()
 +
 +      // Copy the contents of another message to this one.
 +      Copy(m Message) error
 +}
 +
 +type message struct{ pn *C.pn_message_t }
 +
 +func freeMessage(m *message) {
 +      C.pn_message_free(m.pn)
 +      m.pn = nil
 +}
 +
 +// NewMessage creates a new message instance.
 +func NewMessage() Message {
 +      m := &message{C.pn_message()}
 +      runtime.SetFinalizer(m, freeMessage)
 +      return m
 +}
 +
 +// NewMessageWith creates a message with value as the body. Equivalent to
 +//     m := NewMessage(); m.Marshal(body)
 +func NewMessageWith(value interface{}) Message {
 +      m := NewMessage()
 +      m.Marshal(value)
 +      return m
 +}
 +
 +func (m *message) Clear() { C.pn_message_clear(m.pn) }
 +
 +func (m *message) Copy(x Message) error {
 +      if data, err := x.Encode(nil); err == nil {
 +              return m.Decode(data)
 +      } else {
 +              return err
 +      }
 +}
 +
 +// ==== message get functions
 +
 +func rewindGet(data *C.pn_data_t) (v interface{}) {
 +      C.pn_data_rewind(data)
 +      C.pn_data_next(data)
 +      unmarshal(&v, data)
 +      return v
 +}
 +
 +func rewindMap(data *C.pn_data_t) (v map[string]interface{}) {
 +      C.pn_data_rewind(data)
 +      C.pn_data_next(data)
 +      unmarshal(&v, data)
 +      return v
 +}
 +
 +func (m *message) Inferred() bool  { return 
bool(C.pn_message_is_inferred(m.pn)) }
 +func (m *message) Durable() bool   { return 
bool(C.pn_message_is_durable(m.pn)) }
 +func (m *message) Priority() uint8 { return 
uint8(C.pn_message_get_priority(m.pn)) }
 +func (m *message) TTL() time.Duration {
 +      return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
 +}
 +func (m *message) FirstAcquirer() bool        { return 
bool(C.pn_message_is_first_acquirer(m.pn)) }
 +func (m *message) DeliveryCount() uint32      { return 
uint32(C.pn_message_get_delivery_count(m.pn)) }
 +func (m *message) MessageId() interface{}     { return 
rewindGet(C.pn_message_id(m.pn)) }
 +func (m *message) UserId() string             { return 
goString(C.pn_message_get_user_id(m.pn)) }
 +func (m *message) Address() string            { return 
C.GoString(C.pn_message_get_address(m.pn)) }
 +func (m *message) Subject() string            { return 
C.GoString(C.pn_message_get_subject(m.pn)) }
 +func (m *message) ReplyTo() string            { return 
C.GoString(C.pn_message_get_reply_to(m.pn)) }
 +func (m *message) CorrelationId() interface{} { return 
rewindGet(C.pn_message_correlation_id(m.pn)) }
 +func (m *message) ContentType() string        { return 
C.GoString(C.pn_message_get_content_type(m.pn)) }
 +func (m *message) ContentEncoding() string    { return 
C.GoString(C.pn_message_get_content_encoding(m.pn)) }
 +
 +func (m *message) ExpiryTime() time.Time {
 +      return time.Unix(0, 
int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
 +}
 +func (m *message) CreationTime() time.Time {
 +      return time.Unix(0, 
int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
 +}
 +func (m *message) GroupId() string        { return 
C.GoString(C.pn_message_get_group_id(m.pn)) }
 +func (m *message) GroupSequence() int32   { return 
int32(C.pn_message_get_group_sequence(m.pn)) }
 +func (m *message) ReplyToGroupId() string { return 
C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
 +
 +func (m *message) Instructions() map[string]interface{} {
 +      return rewindMap(C.pn_message_instructions(m.pn))
 +}
 +func (m *message) Annotations() map[string]interface{} {
 +      return rewindMap(C.pn_message_annotations(m.pn))
 +}
 +func (m *message) Properties() map[string]interface{} {
 +      return rewindMap(C.pn_message_properties(m.pn))
 +}
 +
 +// ==== message set methods
 +
 +func setData(v interface{}, data *C.pn_data_t) {
 +      C.pn_data_clear(data)
 +      marshal(v, data)
 +}
 +
 +func dataString(data *C.pn_data_t) string {
 +      str := C.pn_string(C.CString(""))
 +      defer C.pn_free(unsafe.Pointer(str))
 +      C.pn_inspect(unsafe.Pointer(data), str)
 +      return C.GoString(C.pn_string_get(str))
 +}
 +
 +func (m *message) SetInferred(b bool)  { C.pn_message_set_inferred(m.pn, 
C.bool(m.Inferred())) }
 +func (m *message) SetDurable(b bool)   { C.pn_message_set_durable(m.pn, 
C.bool(b)) }
 +func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, 
C.uint8_t(b)) }
 +func (m *message) SetTTL(d time.Duration) {
 +      C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
 +}
 +func (m *message) SetFirstAcquirer(b bool)     { 
C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
 +func (m *message) SetDeliveryCount(c uint32)   { 
C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
 +func (m *message) SetMessageId(id interface{}) { setData(id, 
C.pn_message_id(m.pn)) }
 +func (m *message) SetUserId(s string)          { 
C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
 +func (m *message) SetAddress(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
 +}
 +func (m *message) SetSubject(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
 +}
 +func (m *message) SetReplyTo(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
 +}
 +func (m *message) SetCorrelationId(c interface{}) { setData(c, 
C.pn_message_correlation_id(m.pn)) }
 +func (m *message) SetContentType(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), 
C.set_fn(C.pn_message_set_content_type))
 +}
 +func (m *message) SetContentEncoding(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), 
C.set_fn(C.pn_message_set_content_encoding))
 +}
 +func (m *message) SetExpiryTime(t time.Time)   { 
C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
 +func (m *message) SetCreationTime(t time.Time) { 
C.pn_message_set_creation_time(m.pn, pnTime(t)) }
 +func (m *message) SetGroupId(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
 +}
 +func (m *message) SetGroupSequence(s int32) {
 +      C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
 +}
 +func (m *message) SetReplyToGroupId(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), 
C.set_fn(C.pn_message_set_reply_to_group_id))
 +}
 +
 +func (m *message) SetInstructions(v map[string]interface{}) {
 +      setData(v, C.pn_message_instructions(m.pn))
 +}
 +func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, 
C.pn_message_annotations(m.pn)) }
 +func (m *message) SetProperties(v map[string]interface{})  { setData(v, 
C.pn_message_properties(m.pn)) }
 +
 +// Marshal/Unmarshal body
 +func (m *message) Marshal(v interface{})   { clearMarshal(v, 
C.pn_message_body(m.pn)) }
 +func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, 
C.pn_message_body(m.pn)) }
 +func (m *message) Body() (v interface{})   { m.Unmarshal(&v); return }
 +
 +func (m *message) Decode(data []byte) error {
 +      m.Clear()
 +      if len(data) == 0 {
-               return internal.Errorf("empty buffer for decode")
++              return fmt.Errorf("empty buffer for decode")
 +      }
 +      if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
-               return internal.Errorf("decoding message: %s",
-                       
internal.PnError(unsafe.Pointer(C.pn_message_error(m.pn))))
++              return fmt.Errorf("decoding message: %s", 
PnError(C.pn_message_error(m.pn)))
 +      }
 +      return nil
 +}
 +
 +func DecodeMessage(data []byte) (m Message, err error) {
 +      m = NewMessage()
 +      err = m.Decode(data)
 +      return
 +}
 +
 +func (m *message) Encode(buffer []byte) ([]byte, error) {
 +      encode := func(buf []byte) ([]byte, error) {
 +              len := cLen(buf)
 +              result := C.pn_message_encode(m.pn, cPtr(buf), &len)
 +              switch {
 +              case result == C.PN_OVERFLOW:
 +                      return buf, overflow
 +              case result < 0:
-                       return buf, internal.Errorf("cannot encode message: 
%s", internal.PnErrorCode(result))
++                      return buf, fmt.Errorf("cannot encode message: %s", 
PnErrorCode(result))
 +              default:
 +                      return buf[:len], nil
 +              }
 +      }
 +      return encodeGrow(buffer, encode)
 +}
 +
 +// TODO aconway 2015-09-14: Multi-section messages.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/types.go
----------------------------------------------------------------------
diff --cc amqp/types.go
index 796da66,0000000..abcff25
mode 100644,000000..100644
--- a/amqp/types.go
+++ b/amqp/types.go
@@@ -1,199 -1,0 +1,203 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +// #include <proton/codec.h>
 +import "C"
 +
 +import (
 +      "bytes"
 +      "fmt"
 +      "reflect"
 +      "time"
 +      "unsafe"
 +)
 +
 +type Type C.pn_type_t
 +
++// Older proton versions don't define C.PN_INVALID, so define it here.
++// In C it is pn_type_t(-1), in Go use the bitwise NOT operator to get the 
same value.
++const pnInvalid = ^C.pn_type_t(0)
++
 +func (t Type) String() string {
 +      switch C.pn_type_t(t) {
 +      case C.PN_NULL:
 +              return "null"
 +      case C.PN_BOOL:
 +              return "bool"
 +      case C.PN_UBYTE:
 +              return "ubyte"
 +      case C.PN_BYTE:
 +              return "byte"
 +      case C.PN_USHORT:
 +              return "ushort"
 +      case C.PN_SHORT:
 +              return "short"
 +      case C.PN_CHAR:
 +              return "char"
 +      case C.PN_UINT:
 +              return "uint"
 +      case C.PN_INT:
 +              return "int"
 +      case C.PN_ULONG:
 +              return "ulong"
 +      case C.PN_LONG:
 +              return "long"
 +      case C.PN_TIMESTAMP:
 +              return "timestamp"
 +      case C.PN_FLOAT:
 +              return "float"
 +      case C.PN_DOUBLE:
 +              return "double"
 +      case C.PN_DECIMAL32:
 +              return "decimal32"
 +      case C.PN_DECIMAL64:
 +              return "decimal64"
 +      case C.PN_DECIMAL128:
 +              return "decimal128"
 +      case C.PN_UUID:
 +              return "uuid"
 +      case C.PN_BINARY:
 +              return "binary"
 +      case C.PN_STRING:
 +              return "string"
 +      case C.PN_SYMBOL:
 +              return "symbol"
 +      case C.PN_DESCRIBED:
 +              return "described"
 +      case C.PN_ARRAY:
 +              return "array"
 +      case C.PN_LIST:
 +              return "list"
 +      case C.PN_MAP:
 +              return "map"
 +      default:
-               if uint32(t) == uint32(C.PN_INVALID) {
++              if uint32(t) == uint32(pnInvalid) {
 +                      return "no-data"
 +              }
 +              return fmt.Sprintf("unknown-type(%d)", t)
 +      }
 +}
 +
 +// Go types
 +var (
 +      bytesType = reflect.TypeOf([]byte{})
 +      valueType = reflect.TypeOf(reflect.Value{})
 +)
 +
 +// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are 
not valid Go map keys.
 +
 +// Map is a generic map that can have mixed key and value types and so can 
represent any AMQP map
 +type Map map[interface{}]interface{}
 +
 +// List is a generic list that can hold mixed values and can represent any 
AMQP list.
 +//
 +type List []interface{}
 +
 +// Symbol is a string that is encoded as an AMQP symbol
 +type Symbol string
 +
 +func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) }
 +
 +// Binary is a string that is encoded as an AMQP binary.
 +// It is a string rather than a byte[] because byte[] is not hashable and 
can't be used as
 +// a map key, AMQP frequently uses binary types as map keys. It can convert 
to and from []byte
 +type Binary string
 +
 +func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) }
 +
 +// GoString for Map prints values with their types, useful for debugging.
 +func (m Map) GoString() string {
 +      out := &bytes.Buffer{}
 +      fmt.Fprintf(out, "%T{", m)
 +      i := len(m)
 +      for k, v := range m {
 +              fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v)
 +              i--
 +              if i > 0 {
 +                      fmt.Fprint(out, ", ")
 +              }
 +      }
 +      fmt.Fprint(out, "}")
 +      return out.String()
 +}
 +
 +// GoString for List prints values with their types, useful for debugging.
 +func (l List) GoString() string {
 +      out := &bytes.Buffer{}
 +      fmt.Fprintf(out, "%T{", l)
 +      for i := 0; i < len(l); i++ {
 +              fmt.Fprintf(out, "%T(%#v)", l[i], l[i])
 +              if i == len(l)-1 {
 +                      fmt.Fprint(out, ", ")
 +              }
 +      }
 +      fmt.Fprint(out, "}")
 +      return out.String()
 +}
 +
 +// pnTime converts Go time.Time to Proton millisecond Unix time.
 +func pnTime(t time.Time) C.pn_timestamp_t {
 +      secs := t.Unix()
 +      // Note: sub-second accuracy is not guaraunteed if the Unix time in
 +      // nanoseconds cannot be represented by an int64 (sometime around year 
2260)
 +      msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
 +      return C.pn_timestamp_t(secs*1000 + msecs)
 +}
 +
 +// goTime converts a pn_timestamp_t to a Go time.Time.
 +func goTime(t C.pn_timestamp_t) time.Time {
 +      secs := int64(t) / 1000
 +      nsecs := (int64(t) % 1000) * int64(time.Millisecond)
 +      return time.Unix(secs, nsecs)
 +}
 +
 +func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
 +      if cBytes.start != nil {
 +              bytes = C.GoBytes(unsafe.Pointer(cBytes.start), 
C.int(cBytes.size))
 +      }
 +      return
 +}
 +
 +func goString(cBytes C.pn_bytes_t) (str string) {
 +      if cBytes.start != nil {
 +              str = C.GoStringN(cBytes.start, C.int(cBytes.size))
 +      }
 +      return
 +}
 +
 +func pnBytes(b []byte) C.pn_bytes_t {
 +      if len(b) == 0 {
 +              return C.pn_bytes_t{0, nil}
 +      } else {
 +              return C.pn_bytes_t{C.size_t(len(b)), 
(*C.char)(unsafe.Pointer(&b[0]))}
 +      }
 +}
 +
 +func cPtr(b []byte) *C.char {
 +      if len(b) == 0 {
 +              return nil
 +      }
 +      return (*C.char)(unsafe.Pointer(&b[0]))
 +}
 +
 +func cLen(b []byte) C.size_t {
 +      return C.size_t(len(b))
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/unmarshal.go
----------------------------------------------------------------------
diff --cc amqp/unmarshal.go
index 751921d,0000000..25bb519
mode 100644,000000..100644
--- a/amqp/unmarshal.go
+++ b/amqp/unmarshal.go
@@@ -1,558 -1,0 +1,561 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +oor more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +// #include <proton/codec.h>
 +import "C"
 +
 +import (
 +      "bytes"
 +      "fmt"
 +      "io"
-       "qpid.apache.org/internal"
 +      "reflect"
 +      "unsafe"
 +)
 +
 +const minDecode = 1024
 +
 +// Error returned if AMQP data cannot be unmarshaled as the desired Go type.
 +type UnmarshalError struct {
 +      // The name of the AMQP type.
 +      AMQPType string
 +      // The Go type.
 +      GoType reflect.Type
 +}
 +
 +func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
 +      return &UnmarshalError{Type(pnType).String(), reflect.TypeOf(v)}
 +}
 +
 +func (e UnmarshalError) Error() string {
 +      if e.GoType.Kind() != reflect.Ptr {
-               return fmt.Sprintf("proton: cannot unmarshal to type %s, not a 
pointer", e.GoType)
++              return fmt.Sprintf("cannot unmarshal to type %s, not a 
pointer", e.GoType)
 +      } else {
-               return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", 
e.AMQPType, e.GoType)
++              return fmt.Sprintf("cannot unmarshal AMQP %s to %s", 
e.AMQPType, e.GoType)
 +      }
 +}
 +
 +func doRecover(err *error) {
 +      r := recover()
 +      switch r := r.(type) {
 +      case nil:
-       case *UnmarshalError, internal.Error:
-               *err = r.(error)
++      case *UnmarshalError:
++              *err = r
 +      default:
 +              panic(r)
 +      }
 +}
 +
 +//
 +// Decoding from a pn_data_t
 +//
 +// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
 +// We recover() at the highest possible level - i.e. in the exported 
Unmarshal or Decode.
 +//
 +
 +// Decoder decodes AMQP values from an io.Reader.
 +//
 +type Decoder struct {
 +      reader io.Reader
 +      buffer bytes.Buffer
 +}
 +
 +// NewDecoder returns a new decoder that reads from r.
 +//
 +// The decoder has it's own buffer and may read more data than required for 
the
 +// AMQP values requested.  Use Buffered to see if there is data left in the
 +// buffer.
 +//
 +func NewDecoder(r io.Reader) *Decoder {
 +      return &Decoder{r, bytes.Buffer{}}
 +}
 +
 +// Buffered returns a reader of the data remaining in the Decoder's buffer. 
The
 +// reader is valid until the next call to Decode.
 +//
 +func (d *Decoder) Buffered() io.Reader {
 +      return bytes.NewReader(d.buffer.Bytes())
 +}
 +
 +// Decode reads the next AMQP value from the Reader and stores it in the 
value pointed to by v.
 +//
 +// See the documentation for Unmarshal for details about the conversion of 
AMQP into a Go value.
 +//
 +func (d *Decoder) Decode(v interface{}) (err error) {
 +      defer doRecover(&err)
 +      data := C.pn_data(0)
 +      defer C.pn_data_free(data)
 +      var n int
-       for n == 0 && err == nil {
-               n = decode(data, d.buffer.Bytes())
++      for n == 0 {
++              n, err = decode(data, d.buffer.Bytes())
++              if err != nil {
++                      return err
++              }
 +              if n == 0 { // n == 0 means not enough data, read more
 +                      err = d.more()
 +              } else {
 +                      unmarshal(v, data)
 +              }
 +      }
 +      d.buffer.Next(n)
 +      return
 +}
 +
 +/*
 +Unmarshal decodes AMQP-encoded bytes and stores the result in the value 
pointed to by v.
 +Types are converted as follows:
 +
 + 
+---------------------------+----------------------------------------------------------------------+
 + |To Go types                |From AMQP types                                 
                      |
 + 
+===========================+======================================================================+
 + |bool                       |bool                                            
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |int, int8, int16,          |Equivalent or smaller signed integer type: 
byte, short, int, long.    |
 + |int32, int64               |                                                
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |uint, uint8, uint16,       |Equivalent or smaller unsigned integer type: 
ubyte, ushort, uint,     |
 + |uint32, uint64 types       |ulong                                           
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |float32, float64           |Equivalent or smaller float or double.          
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |string, []byte             |string, symbol or binary.                       
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |Symbol                     |symbol                                          
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |map[K]T                    |map, provided all keys and values can unmarshal 
to types K, T         |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |Map                        |map, any AMQP map                               
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |interface{}                |Any AMQP value can be unmarshaled to an 
interface{} as follows:       |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |AMQP Type               |Go Type in interface{} 
                      |
 + |                           
+========================+=============================================+
 + |                           |bool                    |bool                   
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |byte,short,int,long     |int8,int16,int32,int64 
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |ubyte,ushort,uint,ulong 
|uint8,uint16,uint32,uint64                   |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |float, double           |float32, float64       
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |string                  |string                 
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |symbol                  |Symbol                 
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |binary                  |Binary                 
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |nulll                   |nil                    
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |map                     |Map                    
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |list                    |List                   
                      |
 + 
+---------------------------+------------------------+---------------------------------------------+
 +
 +The following Go types cannot be unmarshaled: uintptr, function, interface, 
channel.
 +
 +TODO
 +
 +Go types: array, struct.
 +
 +AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, 
multi-section message bodies.
 +
 +AMQP maps with mixed/unhashable key types need an alternate representation.
 +
 +Described types.
 +*/
 +func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
 +      defer doRecover(&err)
 +
 +      data := C.pn_data(0)
 +      defer C.pn_data_free(data)
-       n = decode(data, bytes)
++      n, err = decode(data, bytes)
++      if err != nil {
++              return 0, err
++      }
 +      if n == 0 {
-               err = internal.Errorf("not enough data")
++              return 0, fmt.Errorf("not enough data")
 +      } else {
 +              unmarshal(v, data)
 +      }
-       return
++      return n, nil
 +}
 +
 +// more reads more data when we can't parse a complete AMQP type
 +func (d *Decoder) more() error {
 +      var readSize int64 = minDecode
 +      if int64(d.buffer.Len()) > readSize { // Grow by doubling
 +              readSize = int64(d.buffer.Len())
 +      }
 +      var n int64
 +      n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
 +      if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
 +              err = io.EOF
 +      }
 +      return err
 +}
 +
 +// Unmarshal from data into value pointed at by v.
 +func unmarshal(v interface{}, data *C.pn_data_t) {
 +      pnType := C.pn_data_type(data)
 +      switch v := v.(type) {
 +      case *bool:
 +              switch pnType {
 +              case C.PN_BOOL:
 +                      *v = bool(C.pn_data_get_bool(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *int8:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = int8(C.pn_data_get_char(data))
 +              case C.PN_BYTE:
 +                      *v = int8(C.pn_data_get_byte(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *uint8:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = uint8(C.pn_data_get_char(data))
 +              case C.PN_UBYTE:
 +                      *v = uint8(C.pn_data_get_ubyte(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *int16:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = int16(C.pn_data_get_char(data))
 +              case C.PN_BYTE:
 +                      *v = int16(C.pn_data_get_byte(data))
 +              case C.PN_SHORT:
 +                      *v = int16(C.pn_data_get_short(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *uint16:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = uint16(C.pn_data_get_char(data))
 +              case C.PN_UBYTE:
 +                      *v = uint16(C.pn_data_get_ubyte(data))
 +              case C.PN_USHORT:
 +                      *v = uint16(C.pn_data_get_ushort(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *int32:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = int32(C.pn_data_get_char(data))
 +              case C.PN_BYTE:
 +                      *v = int32(C.pn_data_get_byte(data))
 +              case C.PN_SHORT:
 +                      *v = int32(C.pn_data_get_short(data))
 +              case C.PN_INT:
 +                      *v = int32(C.pn_data_get_int(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *uint32:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = uint32(C.pn_data_get_char(data))
 +              case C.PN_UBYTE:
 +                      *v = uint32(C.pn_data_get_ubyte(data))
 +              case C.PN_USHORT:
 +                      *v = uint32(C.pn_data_get_ushort(data))
 +              case C.PN_UINT:
 +                      *v = uint32(C.pn_data_get_uint(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *int64:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = int64(C.pn_data_get_char(data))
 +              case C.PN_BYTE:
 +                      *v = int64(C.pn_data_get_byte(data))
 +              case C.PN_SHORT:
 +                      *v = int64(C.pn_data_get_short(data))
 +              case C.PN_INT:
 +                      *v = int64(C.pn_data_get_int(data))
 +              case C.PN_LONG:
 +                      *v = int64(C.pn_data_get_long(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *uint64:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = uint64(C.pn_data_get_char(data))
 +              case C.PN_UBYTE:
 +                      *v = uint64(C.pn_data_get_ubyte(data))
 +              case C.PN_USHORT:
 +                      *v = uint64(C.pn_data_get_ushort(data))
 +              case C.PN_ULONG:
 +                      *v = uint64(C.pn_data_get_ulong(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *int:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = int(C.pn_data_get_char(data))
 +              case C.PN_BYTE:
 +                      *v = int(C.pn_data_get_byte(data))
 +              case C.PN_SHORT:
 +                      *v = int(C.pn_data_get_short(data))
 +              case C.PN_INT:
 +                      *v = int(C.pn_data_get_int(data))
 +              case C.PN_LONG:
 +                      if unsafe.Sizeof(0) == 8 {
 +                              *v = int(C.pn_data_get_long(data))
 +                      } else {
 +                              panic(newUnmarshalError(pnType, v))
 +                      }
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *uint:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = uint(C.pn_data_get_char(data))
 +              case C.PN_UBYTE:
 +                      *v = uint(C.pn_data_get_ubyte(data))
 +              case C.PN_USHORT:
 +                      *v = uint(C.pn_data_get_ushort(data))
 +              case C.PN_UINT:
 +                      *v = uint(C.pn_data_get_uint(data))
 +              case C.PN_ULONG:
 +                      if unsafe.Sizeof(0) == 8 {
 +                              *v = uint(C.pn_data_get_ulong(data))
 +                      } else {
 +                              panic(newUnmarshalError(pnType, v))
 +                      }
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *float32:
 +              switch pnType {
 +              case C.PN_FLOAT:
 +                      *v = float32(C.pn_data_get_float(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *float64:
 +              switch pnType {
 +              case C.PN_FLOAT:
 +                      *v = float64(C.pn_data_get_float(data))
 +              case C.PN_DOUBLE:
 +                      *v = float64(C.pn_data_get_double(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *string:
 +              switch pnType {
 +              case C.PN_STRING:
 +                      *v = goString(C.pn_data_get_string(data))
 +              case C.PN_SYMBOL:
 +                      *v = goString(C.pn_data_get_symbol(data))
 +              case C.PN_BINARY:
 +                      *v = goString(C.pn_data_get_binary(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *[]byte:
 +              switch pnType {
 +              case C.PN_STRING:
 +                      *v = goBytes(C.pn_data_get_string(data))
 +              case C.PN_SYMBOL:
 +                      *v = goBytes(C.pn_data_get_symbol(data))
 +              case C.PN_BINARY:
 +                      *v = goBytes(C.pn_data_get_binary(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *Binary:
 +              switch pnType {
 +              case C.PN_BINARY:
 +                      *v = Binary(goBytes(C.pn_data_get_binary(data)))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *Symbol:
 +              switch pnType {
 +              case C.PN_SYMBOL:
 +                      *v = Symbol(goBytes(C.pn_data_get_symbol(data)))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *interface{}:
 +              getInterface(data, v)
 +
 +      default:
 +              if reflect.TypeOf(v).Kind() != reflect.Ptr {
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +              switch reflect.TypeOf(v).Elem().Kind() {
 +              case reflect.Map:
 +                      getMap(data, v)
 +              case reflect.Slice:
 +                      getList(data, v)
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      }
 +      err := dataError("unmarshaling", data)
 +      if err != nil {
 +              panic(err)
 +      }
 +      return
 +}
 +
 +func rewindUnmarshal(v interface{}, data *C.pn_data_t) {
 +      C.pn_data_rewind(data)
 +      C.pn_data_next(data)
 +      unmarshal(v, data)
 +}
 +
 +// Getting into an interface is driven completely by the AMQP type, since the 
interface{}
 +// target is type-neutral.
 +func getInterface(data *C.pn_data_t, v *interface{}) {
 +      pnType := C.pn_data_type(data)
 +      switch pnType {
-       // Note PN_INVALID is defined outside the enum, older Go versions don't 
consider it a C.pn_type_t
-       case C.PN_NULL, C.pn_type_t(C.PN_INVALID): // No data.
++      case C.PN_NULL, C.pn_type_t(pnInvalid): // No data.
 +              *v = nil
 +      case C.PN_BOOL:
 +              *v = bool(C.pn_data_get_bool(data))
 +      case C.PN_UBYTE:
 +              *v = uint8(C.pn_data_get_ubyte(data))
 +      case C.PN_BYTE:
 +              *v = int8(C.pn_data_get_byte(data))
 +      case C.PN_USHORT:
 +              *v = uint16(C.pn_data_get_ushort(data))
 +      case C.PN_SHORT:
 +              *v = int16(C.pn_data_get_short(data))
 +      case C.PN_UINT:
 +              *v = uint32(C.pn_data_get_uint(data))
 +      case C.PN_INT:
 +              *v = int32(C.pn_data_get_int(data))
 +      case C.PN_CHAR:
 +              *v = uint8(C.pn_data_get_char(data))
 +      case C.PN_ULONG:
 +              *v = uint64(C.pn_data_get_ulong(data))
 +      case C.PN_LONG:
 +              *v = int64(C.pn_data_get_long(data))
 +      case C.PN_FLOAT:
 +              *v = float32(C.pn_data_get_float(data))
 +      case C.PN_DOUBLE:
 +              *v = float64(C.pn_data_get_double(data))
 +      case C.PN_BINARY:
 +              *v = Binary(goBytes(C.pn_data_get_binary(data)))
 +      case C.PN_STRING:
 +              *v = goString(C.pn_data_get_string(data))
 +      case C.PN_SYMBOL:
 +              *v = Symbol(goString(C.pn_data_get_symbol(data)))
 +      case C.PN_MAP:
 +              m := make(Map)
 +              unmarshal(&m, data)
 +              *v = m
 +      case C.PN_LIST:
 +              l := make(List, 0)
 +              unmarshal(&l, data)
 +              *v = l
 +      default:
 +              panic(newUnmarshalError(pnType, v))
 +      }
 +}
 +
 +// get into map pointed at by v
 +func getMap(data *C.pn_data_t, v interface{}) {
 +      mapValue := reflect.ValueOf(v).Elem()
 +      mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map
 +      switch pnType := C.pn_data_type(data); pnType {
 +      case C.PN_MAP:
 +              count := int(C.pn_data_get_map(data))
 +              if bool(C.pn_data_enter(data)) {
 +                      defer C.pn_data_exit(data)
 +                      for i := 0; i < count/2; i++ {
 +                              if bool(C.pn_data_next(data)) {
 +                                      key := 
reflect.New(mapValue.Type().Key())
 +                                      unmarshal(key.Interface(), data)
 +                                      if bool(C.pn_data_next(data)) {
 +                                              val := 
reflect.New(mapValue.Type().Elem())
 +                                              unmarshal(val.Interface(), data)
 +                                              
mapValue.SetMapIndex(key.Elem(), val.Elem())
 +                                      }
 +                              }
 +                      }
 +              }
-               // Note PN_INVALID is defined outside the enum, older Go 
versions don't consider it a C.pn_type_t
-       case C.pn_type_t(C.PN_INVALID): // Leave the map empty
++      case C.pn_type_t(pnInvalid): // Leave the map empty
 +      default:
 +              panic(newUnmarshalError(pnType, v))
 +      }
 +}
 +
 +func getList(data *C.pn_data_t, v interface{}) {
 +      pnType := C.pn_data_type(data)
 +      if pnType != C.PN_LIST {
 +              panic(newUnmarshalError(pnType, v))
 +      }
 +      count := int(C.pn_data_get_list(data))
 +      listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count)
 +      if bool(C.pn_data_enter(data)) {
 +              for i := 0; i < count; i++ {
 +                      if bool(C.pn_data_next(data)) {
 +                              val := reflect.New(listValue.Type().Elem())
 +                              unmarshal(val.Interface(), data)
 +                              listValue.Index(i).Set(val.Elem())
 +                      }
 +              }
 +              C.pn_data_exit(data)
 +      }
 +      reflect.ValueOf(v).Elem().Set(listValue)
 +}
 +
 +// decode from bytes.
 +// Return bytes decoded or 0 if we could not decode a complete object.
 +//
- func decode(data *C.pn_data_t, bytes []byte) int {
++func decode(data *C.pn_data_t, bytes []byte) (int, error) {
 +      if len(bytes) == 0 {
-               return 0
++              return 0, nil
 +      }
 +      n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes)))
 +      if n == int(C.PN_UNDERFLOW) {
 +              C.pn_error_clear(C.pn_data_error(data))
-               return 0
++              return 0, nil
 +      } else if n <= 0 {
-               panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n)))
++              return 0, fmt.Errorf("unmarshal %s", PnErrorCode(n))
 +      }
-       return n
++      return n, nil
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/url.go
----------------------------------------------------------------------
diff --cc amqp/url.go
index 0d0c662,0000000..70545d2
mode 100644,000000..100644
--- a/amqp/url.go
+++ b/amqp/url.go
@@@ -1,96 -1,0 +1,96 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +/*
 +#include <stdlib.h>
 +#include <string.h>
 +#include <proton/url.h>
 +
 +// Helper function for setting URL fields.
 +typedef void (*setter_fn)(pn_url_t* url, const char* value);
 +inline void   set(pn_url_t *url, setter_fn s, const char* value) {
 +  s(url, value);
 +}
 +*/
 +import "C"
 +
 +import (
++      "fmt"
 +      "net"
 +      "net/url"
-       "qpid.apache.org/internal"
 +      "unsafe"
 +)
 +
 +const (
 +      amqp  string = "amqp"
 +      amqps        = "amqps"
 +)
 +
 +// ParseUrl parses an AMQP URL string and returns a net/url.Url.
 +//
 +// It is more forgiving than net/url.Parse and allows most of the parts of the
 +// URL to be missing, assuming AMQP defaults.
 +//
 +func ParseURL(s string) (u *url.URL, err error) {
 +      cstr := C.CString(s)
 +      defer C.free(unsafe.Pointer(cstr))
 +      pnUrl := C.pn_url_parse(cstr)
 +      if pnUrl == nil {
-               return nil, internal.Errorf("bad URL %#v", s)
++              return nil, fmt.Errorf("bad URL %#v", s)
 +      }
 +      defer C.pn_url_free(pnUrl)
 +
 +      scheme := C.GoString(C.pn_url_get_scheme(pnUrl))
 +      username := C.GoString(C.pn_url_get_username(pnUrl))
 +      password := C.GoString(C.pn_url_get_password(pnUrl))
 +      host := C.GoString(C.pn_url_get_host(pnUrl))
 +      port := C.GoString(C.pn_url_get_port(pnUrl))
 +      path := C.GoString(C.pn_url_get_path(pnUrl))
 +
 +      if err != nil {
-               return nil, internal.Errorf("bad URL %#v: %s", s, err)
++              return nil, fmt.Errorf("bad URL %#v: %s", s, err)
 +      }
 +      if scheme == "" {
 +              scheme = amqp
 +      }
 +      if port == "" {
 +              if scheme == amqps {
 +                      port = amqps
 +              } else {
 +                      port = amqp
 +              }
 +      }
 +      var user *url.Userinfo
 +      if password != "" {
 +              user = url.UserPassword(username, password)
 +      } else if username != "" {
 +              user = url.User(username)
 +      }
 +
 +      u = &url.URL{
 +              Scheme: scheme,
 +              User:   user,
 +              Host:   net.JoinHostPort(host, port),
 +              Path:   path,
 +      }
 +
 +      return u, nil
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/url_test.go
----------------------------------------------------------------------
diff --cc amqp/url_test.go
index f80f1c4,0000000..99b656d
mode 100644,000000..100644
--- a/amqp/url_test.go
+++ b/amqp/url_test.go
@@@ -1,51 -1,0 +1,51 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +import (
 +      "fmt"
 +)
 +
 +func ExampleParseURL() {
 +      for _, s := range []string{
 +              "amqp://username:password@host:1234/path",
 +              "host:1234",
 +              "host",
 +              ":1234",
 +              "host/path",
 +              "amqps://host",
 +              "",
 +      } {
 +              u, err := ParseURL(s)
 +              if err != nil {
 +                      fmt.Println(err)
 +              } else {
 +                      fmt.Println(u)
 +              }
 +      }
 +      // Output:
 +      // amqp://username:password@host:1234/path
 +      // amqp://host:1234
 +      // amqp://host:amqp
 +      // amqp://:1234
 +      // amqp://host:amqp/path
 +      // amqps://host:amqps
-       // proton: bad URL ""
++      // bad URL ""
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/connection.go
----------------------------------------------------------------------
diff --cc electron/connection.go
index d6761d6,0000000..8a9e6cd
mode 100644,000000..100644
--- a/electron/connection.go
+++ b/electron/connection.go
@@@ -1,218 -1,0 +1,238 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +// #include <proton/disposition.h>
 +import "C"
 +
 +import (
 +      "net"
-       "qpid.apache.org/amqp"
-       "qpid.apache.org/internal"
 +      "qpid.apache.org/proton"
 +      "sync"
 +      "time"
 +)
 +
 +// Connection is an AMQP connection, created by a Container.
 +type Connection interface {
 +      Endpoint
 +
 +      // Sender opens a new sender on the DefaultSession.
-       //
-       // v can be a string, which is used as the Target address, or a 
SenderSettings
-       // struct containing more details settings.
-       Sender(...LinkSetting) (Sender, error)
++      Sender(...LinkOption) (Sender, error)
 +
 +      // Receiver opens a new Receiver on the DefaultSession().
-       //
-       // v can be a string, which is used as the
-       // Source address, or a ReceiverSettings struct containing more details
-       // settings.
-       Receiver(...LinkSetting) (Receiver, error)
++      Receiver(...LinkOption) (Receiver, error)
 +
 +      // DefaultSession() returns a default session for the connection. It is 
opened
 +      // on the first call to DefaultSession and returned on subsequent calls.
 +      DefaultSession() (Session, error)
 +
 +      // Session opens a new session.
-       Session(...SessionSetting) (Session, error)
++      Session(...SessionOption) (Session, error)
 +
 +      // Container for the connection.
 +      Container() Container
 +
 +      // Disconnect the connection abruptly with an error.
 +      Disconnect(error)
 +
 +      // Wait waits for the connection to be disconnected.
 +      Wait() error
 +
 +      // WaitTimeout is like Wait but returns Timeout if the timeout expires.
 +      WaitTimeout(time.Duration) error
++
++      // Incoming returns a channel for incoming endpoints opened by the 
remote end.
++      //
++      // To enable, pass AllowIncoming() when creating the Connection. 
Otherwise all
++      // incoming endpoint requests are automatically rejected and Incoming()
++      // returns nil.
++      //
++      // An Incoming value can be an *IncomingSession, *IncomingSender or
++      // *IncomingReceiver.  You must call Accept() to open the endpoint or 
Reject()
++      // to close it with an error. The specific Incoming types have 
additional
++      // methods to configure the endpoint.
++      //
++      // Not receiving from Incoming() or not calling Accept/Reject will 
block the
++      // electron event loop. Normally you would have a dedicated goroutine 
receive
++      // from Incoming() and start new goroutines to serve each incoming 
endpoint.
++      // The channel is closed when the Connection closes.
++      //
++      Incoming() <-chan Incoming
 +}
 +
- // ConnectionSetting can be passed when creating a connection.
- // See functions that return ConnectionSetting for details
- type ConnectionSetting func(*connection)
++// ConnectionOption can be passed when creating a connection to configure 
various options
++type ConnectionOption func(*connection)
 +
- // Server setting puts the connection in server mode.
++// Server returns a ConnectionOption to put the connection in server mode.
 +//
 +// A server connection will do protocol negotiation to accept a incoming AMQP
 +// connection. Normally you would call this for a connection created by
 +// net.Listener.Accept()
 +//
- func Server() ConnectionSetting { return func(c *connection) { 
c.engine.Server() } }
++func Server() ConnectionOption { return func(c *connection) { 
c.engine.Server() } }
 +
- // Accepter provides a function to be called when a connection receives an 
incoming
- // request to open an endpoint, one of IncomingSession, IncomingSender or 
IncomingReceiver.
- //
- // The accept() function must not block or use the accepted endpoint.
- // It can pass the endpoint to another goroutine for processing.
- //
- // By default all incoming endpoints are rejected.
- func Accepter(accept func(Incoming)) ConnectionSetting {
-       return func(c *connection) { c.accept = accept }
++// AllowIncoming returns a ConnectionOption to enable incoming endpoint open 
requests.
++// See Connection.Incoming()
++func AllowIncoming() ConnectionOption {
++      return func(c *connection) { c.incoming = make(chan Incoming) }
 +}
 +
 +type connection struct {
 +      endpoint
-       listenOnce, defaultSessionOnce, closeOnce sync.Once
++      defaultSessionOnce, closeOnce sync.Once
 +
 +      container   *container
 +      conn        net.Conn
-       accept      func(Incoming)
++      incoming    chan Incoming
 +      handler     *handler
 +      engine      *proton.Engine
-       err         internal.ErrorHolder
 +      eConnection proton.Connection
 +
 +      defaultSession Session
-       done           chan struct{}
 +}
 +
- func newConnection(conn net.Conn, cont *container, setting 
...ConnectionSetting) (*connection, error) {
-       c := &connection{container: cont, conn: conn, accept: func(Incoming) 
{}, done: make(chan struct{})}
++func newConnection(conn net.Conn, cont *container, setting 
...ConnectionOption) (*connection, error) {
++      c := &connection{container: cont, conn: conn}
 +      c.handler = newHandler(c)
 +      var err error
 +      c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
 +      if err != nil {
 +              return nil, err
 +      }
 +      for _, set := range setting {
 +              set(c)
 +      }
-       c.str = c.engine.String()
++      c.endpoint = makeEndpoint(c.engine.String())
 +      c.eConnection = c.engine.Connection()
-       go func() { c.engine.Run(); close(c.done) }()
++      go c.run()
 +      return c, nil
 +}
 +
++func (c *connection) run() {
++      c.engine.Run()
++      if c.incoming != nil {
++              close(c.incoming)
++      }
++      c.closed(Closed)
++}
++
 +func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
 +
 +func (c *connection) Disconnect(err error) { c.err.Set(err); 
c.engine.Disconnect(err) }
 +
- func (c *connection) Session(setting ...SessionSetting) (Session, error) {
++func (c *connection) Session(setting ...SessionOption) (Session, error) {
 +      var s Session
 +      err := c.engine.InjectWait(func() error {
++              if c.Error() != nil {
++                      return c.Error()
++              }
 +              eSession, err := c.engine.Connection().Session()
 +              if err == nil {
 +                      eSession.Open()
 +                      if err == nil {
 +                              s = newSession(c, eSession, setting...)
 +                      }
 +              }
 +              return err
 +      })
 +      return s, err
 +}
 +
 +func (c *connection) Container() Container { return c.container }
 +
 +func (c *connection) DefaultSession() (s Session, err error) {
 +      c.defaultSessionOnce.Do(func() {
 +              c.defaultSession, err = c.Session()
 +      })
 +      if err == nil {
 +              err = c.Error()
 +      }
 +      return c.defaultSession, err
 +}
 +
- func (c *connection) Sender(setting ...LinkSetting) (Sender, error) {
++func (c *connection) Sender(setting ...LinkOption) (Sender, error) {
 +      if s, err := c.DefaultSession(); err == nil {
 +              return s.Sender(setting...)
 +      } else {
 +              return nil, err
 +      }
 +}
 +
- func (c *connection) Receiver(setting ...LinkSetting) (Receiver, error) {
++func (c *connection) Receiver(setting ...LinkOption) (Receiver, error) {
 +      if s, err := c.DefaultSession(); err == nil {
 +              return s.Receiver(setting...)
 +      } else {
 +              return nil, err
 +      }
 +}
 +
 +func (c *connection) Connection() Connection { return c }
 +
 +func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
 +func (c *connection) WaitTimeout(timeout time.Duration) error {
 +      _, err := timedReceive(c.done, timeout)
 +      if err == Timeout {
 +              return Timeout
 +      }
 +      return c.Error()
 +}
 +
++func (c *connection) Incoming() <-chan Incoming { return c.incoming }
++
 +// Incoming is the interface for incoming requests to open an endpoint.
 +// Implementing types are IncomingSession, IncomingSender and 
IncomingReceiver.
 +type Incoming interface {
-       // Accept the endpoint with default settings.
-       //
-       // You must not use the returned endpoint in the accept() function that
-       // receives the Incoming value, but you can pass it to other goroutines.
-       //
-       // Implementing types provide type-specific Accept functions that take 
additional settings.
++      // Accept and open the endpoint.
 +      Accept() Endpoint
 +
 +      // Reject the endpoint with an error
 +      Reject(error)
 +
-       error() error
++      // wait for and call the accept function, call in proton goroutine.
++      wait() error
++      pEndpoint() proton.Endpoint
 +}
 +
 +type incoming struct {
-       err      error
-       accepted bool
++      endpoint proton.Endpoint
++      acceptCh chan func() error
++}
++
++func makeIncoming(e proton.Endpoint) incoming {
++      return incoming{endpoint: e, acceptCh: make(chan func() error)}
 +}
 +
- func (i *incoming) Reject(err error) { i.err = err }
++func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return 
err } }
++
++// Call in proton goroutine, wait for and call the accept function fr
++func (in *incoming) wait() error { return (<-in.acceptCh)() }
++
++func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint }
 +
- func (i *incoming) error() error {
-       switch {
-       case i.err != nil:
-               return i.err
-       case !i.accepted:
-               return amqp.Errorf(amqp.NotAllowed, "remote open rejected")
-       default:
++// Called in app goroutine to send an accept function to proton and return 
the resulting endpoint.
++func (in *incoming) accept(f func() Endpoint) Endpoint {
++      done := make(chan Endpoint)
++      in.acceptCh <- func() error {
++              ep := f()
++              done <- ep
 +              return nil
 +      }
++      return <-done
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/container.go
----------------------------------------------------------------------
diff --cc electron/container.go
index 7bbc4b0,0000000..b5ce6c0
mode 100644,000000..100644
--- a/electron/container.go
+++ b/electron/container.go
@@@ -1,71 -1,0 +1,77 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +      "net"
-       "qpid.apache.org/internal"
++      "qpid.apache.org/proton"
++      "strconv"
++      "sync/atomic"
 +)
 +
 +// Container is an AMQP container, it represents a single AMQP 
"application".It
 +// provides functions to create new Connections to remote containers.
 +//
 +// Create with NewContainer()
 +//
 +type Container interface {
 +      // Id is a unique identifier for the container in your distributed 
application.
 +      Id() string
 +
 +      // Create a new AMQP Connection over the supplied net.Conn connection.
 +      //
 +      // You must call Connection.Open() on the returned Connection, after
 +      // setting any Connection properties you need to set. Note the net.Conn
 +      // can be an outgoing connection (e.g. made with net.Dial) or an 
incoming
 +      // connection (e.g. made with net.Listener.Accept())
-       Connection(net.Conn, ...ConnectionSetting) (Connection, error)
++      Connection(net.Conn, ...ConnectionOption) (Connection, error)
 +}
 +
 +type container struct {
-       id        string
-       linkNames internal.IdCounter
++      id         string
++      tagCounter uint64
++}
++
++func (cont *container) nextTag() string {
++      return strconv.FormatUint(atomic.AddUint64(&cont.tagCounter, 1), 32)
 +}
 +
 +// NewContainer creates a new container. The id must be unique in your
 +// distributed application, all connections created by the container
 +// will have this container-id.
 +//
 +// If id == "" a random UUID will be generated for the id.
 +func NewContainer(id string) Container {
 +      if id == "" {
-               id = internal.UUID4().String()
++              id = proton.UUID4().String()
 +      }
 +      cont := &container{id: id}
 +      return cont
 +}
 +
 +func (cont *container) Id() string { return cont.id }
 +
 +func (cont *container) nextLinkName() string {
-       return cont.id + "@" + cont.linkNames.Next()
++      return cont.id + "@" + cont.nextTag()
 +}
 +
- func (cont *container) Connection(conn net.Conn, setting 
...ConnectionSetting) (Connection, error) {
++func (cont *container) Connection(conn net.Conn, setting ...ConnectionOption) 
(Connection, error) {
 +      return newConnection(conn, cont, setting...)
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/doc.go
----------------------------------------------------------------------
diff --cc electron/doc.go
index eaa6e7a,0000000..46bde37
mode 100644,000000..100644
--- a/electron/doc.go
+++ b/electron/doc.go
@@@ -1,57 -1,0 +1,63 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +/*
 +Package electron is a procedural, concurrent-safe Go library for AMQP 
messaging.
 +You can write clients and servers using this library.
 +
 +Start by creating a Container with NewContainer. A Container represents a 
client
 +or server application that can contain many incoming or outgoing connections.
 +
 +Create connections with the standard Go 'net' package using net.Dial or
 +net.Listen. Create an AMQP connection over a net.Conn with
 +Container.Connection() and open it with Connection.Open().
 +
 +AMQP sends messages over "links". Each link has a Sender end and a Receiver
 +end. Connection.Sender() and Connection.Receiver() allow you to create links 
to
 +Send() and Receive() messages.
 +
 +You can create an AMQP server connection by calling Connection.Server() and
 +Connection.Listen() before calling Connection.Open(). A server connection can
 +negotiate protocol security details and can accept incoming links opened from
- the remote end of the connection
++the remote end of the connection.
++
 +*/
 +package electron
 +
 +//#cgo LDFLAGS: -lqpid-proton
 +import "C"
 +
 +// Just for package comment
 +
 +/* DEVELOPER NOTES
 +
 +There is a single proton.Engine per connection, each driving it's own 
event-loop goroutine,
 +and each with a 'handler'. Most state for a connection is maintained on the 
handler, and
- only accessed in the event-loop goroutine, so no locks are required.
++only accessed in the event-loop goroutine, so no locks are required there.
 +
 +The handler sets up channels as needed to get or send data from user 
goroutines
- using electron types like Sender or Receiver. We also use Engine.Inject to 
inject
- actions into the event loop from user goroutines.
++using electron types like Sender or Receiver.
++
++We also use Engine.Inject to inject actions into the event loop from user
++goroutines. It is important to check at the start of an injected function that
++required objects are still valid, for example a link may be remotely closed
++between the time a Sender function calls Inject and the time the injected
++function is execute by the handler goroutine. See comments in endpoint.go for 
more.
 +
 +*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/endpoint.go
----------------------------------------------------------------------
diff --cc electron/endpoint.go
index 745fd04,0000000..8cbeadb
mode 100644,000000..100644
--- a/electron/endpoint.go
+++ b/electron/endpoint.go
@@@ -1,68 -1,0 +1,94 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +      "io"
-       "qpid.apache.org/internal"
 +      "qpid.apache.org/proton"
 +)
 +
 +// Closed is an alias for io.EOF. It is returned as an error when an endpoint
 +// was closed cleanly.
 +var Closed = io.EOF
 +
 +// Endpoint is the common interface for Connection, Session, Link, Sender and 
Receiver.
 +//
 +// Endpoints can be created locally or by the remote peer. You must Open() an
 +// endpoint before you can use it. Some endpoints have additional Set*() 
methods
 +// that must be called before Open() to take effect, see Connection, Session,
 +// Link, Sender and Receiver for details.
 +//
 +type Endpoint interface {
 +      // Close an endpoint and signal an error to the remote end if error != 
nil.
 +      Close(error)
 +
 +      // String is a human readable identifier, useful for debugging and 
logging.
 +      String() string
 +
 +      // Error returns nil if the endpoint is open, otherwise returns an 
error.
 +      // Error() == Closed means the endpoint was closed without error.
 +      Error() error
 +
 +      // Connection containing the endpoint
 +      Connection() Connection
++
++      // Done returns a channel that will close when the endpoint closes.
++      // Error() will contain the reason.
++      Done() <-chan struct{}
 +}
 +
++// DEVELOPER NOTES
++//
++// An electron.Endpoint corresponds to a proton.Endpoint, which can be 
invalidated
++//
 +type endpoint struct {
-       err internal.ErrorHolder
-       str string // Must be set by the value that embeds endpoint.
++      err  proton.ErrorHolder
++      str  string // Must be set by the value that embeds endpoint.
++      done chan struct{}
++}
++
++func makeEndpoint(s string) endpoint { return endpoint{str: s, done: 
make(chan struct{})} }
++
++// Called in handler on a Closed event. Marks the endpoint as closed and the 
corresponding
++// proton.Endpoint pointer as invalid. Injected functions should check 
Error() to ensure
++// the pointer has not been invalidated.
++//
++// Returns the error stored on the endpoint, which may not be different to 
err if there was
++// already a n error
++func (e *endpoint) closed(err error) error {
++      e.err.Set(err)
++      e.err.Set(Closed)
++      close(e.done)
++      return e.err.Get()
 +}
 +
 +func (e *endpoint) String() string { return e.str }
- func (e *endpoint) Error() error   { return e.err.Get() }
 +
- // Call in proton goroutine to close an endpoint locally
++func (e *endpoint) Error() error { return e.err.Get() }
++
++func (e *endpoint) Done() <-chan struct{} { return e.done }
++
++// Call in proton goroutine to initiate closing an endpoint locally
 +// handler will complete the close when remote end closes.
 +func localClose(ep proton.Endpoint, err error) {
 +      if ep.State().LocalActive() {
 +              proton.CloseError(ep, err)
 +      }
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/error.go
----------------------------------------------------------------------
diff --cc electron/error.go
index 0000000,0000000..4dcfd94
new file mode 100644
--- /dev/null
+++ b/electron/error.go
@@@ -1,0 -1,0 +1,35 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++      "fmt"
++)
++
++// assert panics if condition is false with optional formatted message
++func assert(condition bool, format ...interface{}) {
++      if !condition {
++              if len(format) > 0 {
++                      panic(fmt.Errorf(format[0].(string), format[1:]...))
++              } else {
++                      panic(fmt.Errorf("assertion failed"))
++              }
++      }
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/handler.go
----------------------------------------------------------------------
diff --cc electron/handler.go
index b518e42,0000000..0237156
mode 100644,000000..100644
--- a/electron/handler.go
+++ b/electron/handler.go
@@@ -1,158 -1,0 +1,187 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +      "qpid.apache.org/amqp"
 +      "qpid.apache.org/proton"
 +)
 +
 +// NOTE: methods in this file are called only in the proton goroutine unless 
otherwise indicated.
 +
 +type handler struct {
 +      delegator    *proton.MessagingAdapter
 +      connection   *connection
 +      links        map[proton.Link]Link
-       sentMessages map[proton.Delivery]*sentMessage
++      sentMessages map[proton.Delivery]sentMessage
 +      sessions     map[proton.Session]*session
 +}
 +
 +func newHandler(c *connection) *handler {
 +      h := &handler{
 +              connection:   c,
 +              links:        make(map[proton.Link]Link),
-               sentMessages: make(map[proton.Delivery]*sentMessage),
++              sentMessages: make(map[proton.Delivery]sentMessage),
 +              sessions:     make(map[proton.Session]*session),
 +      }
 +      h.delegator = proton.NewMessagingAdapter(h)
 +      // Disable auto features of MessagingAdapter, we do these ourselves.
 +      h.delegator.Prefetch = 0
 +      h.delegator.AutoAccept = false
 +      h.delegator.AutoSettle = false
 +      h.delegator.AutoOpen = false
 +      return h
 +}
 +
- func (h *handler) internalError(fmt string, arg ...interface{}) {
-       proton.CloseError(h.connection.eConnection, 
amqp.Errorf(amqp.InternalError, fmt, arg...))
++func (h *handler) linkError(l proton.Link, msg string) {
++      proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", 
msg, l.Type(), l))
 +}
 +
 +func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e 
proton.Event) {
 +      switch t {
 +
 +      case proton.MMessage:
 +              if r, ok := h.links[e.Link()].(*receiver); ok {
 +                      r.message(e.Delivery())
 +              } else {
-                       h.internalError("no receiver for link %s", e.Link())
++                      h.linkError(e.Link(), "no receiver")
 +              }
 +
 +      case proton.MSettled:
-               if sm := h.sentMessages[e.Delivery()]; sm != nil {
-                       sm.settled(nil)
++              if sm, ok := h.sentMessages[e.Delivery()]; ok {
++                      d := e.Delivery().Remote()
++                      sm.ack <- Outcome{sentStatus(d.Type()), 
d.Condition().Error(), sm.value}
++                      delete(h.sentMessages, e.Delivery())
 +              }
 +
 +      case proton.MSendable:
 +              if s, ok := h.links[e.Link()].(*sender); ok {
 +                      s.sendable()
 +              } else {
-                       h.internalError("no receiver for link %s", e.Link())
++                      h.linkError(e.Link(), "no sender")
 +              }
 +
 +      case proton.MSessionOpening:
 +              if e.Session().State().LocalUninit() { // Remotely opened
-                       incoming := &IncomingSession{h: h, pSession: 
e.Session()}
-                       h.connection.accept(incoming)
-                       if err := incoming.error(); err != nil {
-                               proton.CloseError(e.Session(), err)
-                       }
++                      h.incoming(newIncomingSession(h, e.Session()))
 +              }
 +
 +      case proton.MSessionClosed:
-               err := proton.EndpointError(e.Session())
-               for l, _ := range h.links {
-                       if l.Session() == e.Session() {
-                               h.linkClosed(l, err)
-                       }
-               }
-               delete(h.sessions, e.Session())
++              h.sessionClosed(e.Session(), proton.EndpointError(e.Session()))
 +
 +      case proton.MLinkOpening:
 +              l := e.Link()
 +              if l.State().LocalActive() { // Already opened locally.
 +                      break
 +              }
 +              ss := h.sessions[l.Session()]
 +              if ss == nil {
-                       h.internalError("no session for link %s", e.Link())
++                      h.linkError(e.Link(), "no session")
 +                      break
 +              }
-               var incoming Incoming
 +              if l.IsReceiver() {
-                       incoming = &IncomingReceiver{makeIncomingLink(ss, l)}
++                      h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)})
 +              } else {
-                       incoming = &IncomingSender{makeIncomingLink(ss, l)}
-               }
-               h.connection.accept(incoming)
-               if err := incoming.error(); err != nil {
-                       proton.CloseError(l, err)
-                       break
++                      h.incoming(&IncomingSender{makeIncomingLink(ss, l)})
 +              }
 +
 +      case proton.MLinkClosing:
 +              e.Link().Close()
 +
 +      case proton.MLinkClosed:
 +              h.linkClosed(e.Link(), proton.EndpointError(e.Link()))
 +
 +      case proton.MConnectionClosing:
 +              h.connection.err.Set(e.Connection().RemoteCondition().Error())
 +
 +      case proton.MConnectionClosed:
-               h.connection.err.Set(Closed) // If no error already set, this 
is an orderly close.
++              h.connectionClosed(proton.EndpointError(e.Connection()))
 +
 +      case proton.MDisconnected:
 +              h.connection.err.Set(e.Transport().Condition().Error())
 +              // If err not set at this point (e.g. to Closed) then this is 
unexpected.
 +              h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected 
disconnect on %s", h.connection))
 +
 +              err := h.connection.Error()
++
 +              for l, _ := range h.links {
 +                      h.linkClosed(l, err)
 +              }
++              h.links = nil
 +              for _, s := range h.sessions {
 +                      s.closed(err)
 +              }
++              h.sessions = nil
 +              for _, sm := range h.sentMessages {
-                       sm.settled(err)
++                      sm.ack <- Outcome{Unacknowledged, err, sm.value}
 +              }
++              h.sentMessages = nil
++      }
++}
++
++func (h *handler) incoming(in Incoming) {
++      var err error
++      if h.connection.incoming != nil {
++              h.connection.incoming <- in
++              err = in.wait()
++      } else {
++              err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s",
++                      in.pEndpoint().Type(), in.pEndpoint().String())
++      }
++      if err == nil {
++              in.pEndpoint().Open()
++      } else {
++              proton.CloseError(in.pEndpoint(), err)
 +      }
 +}
 +
++func (h *handler) addLink(pl proton.Link, el Link) {
++      h.links[pl] = el
++}
++
 +func (h *handler) linkClosed(l proton.Link, err error) {
-       if link := h.links[l]; link != nil {
++      if link, ok := h.links[l]; ok {
 +              link.closed(err)
 +              delete(h.links, l)
 +      }
 +}
 +
- func (h *handler) addLink(rl proton.Link, ll Link) {
-       h.links[rl] = ll
++func (h *handler) sessionClosed(ps proton.Session, err error) {
++      if s, ok := h.sessions[ps]; ok {
++              delete(h.sessions, ps)
++              err = s.closed(err)
++              for l, _ := range h.links {
++                      if l.Session() == ps {
++                              h.linkClosed(l, err)
++                      }
++              }
++      }
++}
++
++func (h *handler) connectionClosed(err error) {
++      err = h.connection.closed(err)
++      // Close links first to avoid repeated scans of the link list by 
sessions.
++      for l, _ := range h.links {
++              h.linkClosed(l, err)
++      }
++      for s, _ := range h.sessions {
++              h.sessionClosed(s, err)
++      }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to