Merge branch 'master' into go1 - updated to work with proton C 0.10

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c87499a3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c87499a3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c87499a3

Branch: refs/heads/go1
Commit: c87499a3bcd44c6793c9b5eaa381144a72e06e23
Parents: dfb5b06 bf7e193
Author: Alan Conway <[email protected]>
Authored: Wed Dec 30 16:07:26 2015 -0500
Committer: Alan Conway <[email protected]>
Committed: Wed Dec 30 16:07:26 2015 -0500

----------------------------------------------------------------------
 amqp/codec_shim.h      | 33 +++++++++++++++++++++++++++++++++
 amqp/marshal.go        |  2 +-
 amqp/message.go        |  2 +-
 amqp/types.go          | 11 +++--------
 amqp/unmarshal.go      |  6 +++---
 electron/connection.go |  2 ++
 electron/endpoint.go   | 11 ++++++++---
 electron/sender.go     |  9 ++++-----
 proton/engine.go       |  8 +-------
 proton/wrappers.go     |  5 ++---
 proton/wrappers_gen.go |  2 ++
 11 files changed, 60 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/amqp/codec_shim.h
----------------------------------------------------------------------
diff --cc amqp/codec_shim.h
index 0000000,0000000..b2f9f1c
new file mode 100644
--- /dev/null
+++ b/amqp/codec_shim.h
@@@ -1,0 -1,0 +1,33 @@@
++#ifndef CODEC_SHIM_H
++#define CODEC_SHIM_H
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++/** Stubs to allow the go binding to work with multiple versions of proton. */
++
++#include <proton/codec.h>
++#include <proton/version.h>
++
++#if PN_VERSION_MAJOR == 0 && PN_VERSION_MINOR <= 10
++
++#define PN_INVALID ((pn_type_t)-1)
++
++#endif
++
++#endif // CODEC_SHIM_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/amqp/marshal.go
----------------------------------------------------------------------
diff --cc amqp/marshal.go
index 9930e13,0000000..66e14d8
mode 100644,000000..100644
--- a/amqp/marshal.go
+++ b/amqp/marshal.go
@@@ -1,250 -1,0 +1,250 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
- // #include <proton/codec.h>
++//#include "codec_shim.h"
 +import "C"
 +
 +import (
 +      "fmt"
 +      "io"
 +      "reflect"
 +      "unsafe"
 +)
 +
 +func dataError(prefix string, data *C.pn_data_t) error {
 +      err := PnError(C.pn_data_error(data))
 +      if err != nil {
 +              err = fmt.Errorf("%s: %s", prefix, err.Error())
 +      }
 +      return err
 +}
 +
 +/*
 +Marshal encodes a Go value as AMQP data in buffer.
 +If buffer is nil, or is not large enough, a new buffer  is created.
 +
 +Returns the buffer used for encoding with len() adjusted to the actual size 
of data.
 +
 +Go types are encoded as follows
 +
 + 
+-------------------------------------+--------------------------------------------+
 + |Go type                              |AMQP type                             
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |bool                                 |bool                                  
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |int8, int16, int32, int64 (int)      |byte, short, int, long (int or long)  
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or 
ulong)  |
 + 
+-------------------------------------+--------------------------------------------+
 + |float32, float64                     |float, double.                        
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |string                               |string                                
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |[]byte, Binary                       |binary                                
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |Symbol                               |symbol                                
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |interface{}                          |the contained type                    
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |nil                                  |null                                  
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |map[K]T                              |map with K and T converted as above   
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |Map                                  |map, may have mixed types for keys, 
values  |
 + 
+-------------------------------------+--------------------------------------------+
 + |[]T                                  |list with T converted as above        
      |
 + 
+-------------------------------------+--------------------------------------------+
 + |List                                 |list, may have mixed types  values    
      |
 + 
+-------------------------------------+--------------------------------------------+
 +
 +The following Go types cannot be marshaled: uintptr, function, interface, 
channel
 +
 +TODO
 +
 +Go types: array, slice, struct, complex64/128.
 +
 +AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section 
message bodies.
 +
 +Described types.
 +
 +*/
 +func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
 +      defer doRecover(&err)
 +      data := C.pn_data(0)
 +      defer C.pn_data_free(data)
 +      marshal(v, data)
 +      encode := func(buf []byte) ([]byte, error) {
 +              n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
 +              switch {
 +              case n == int(C.PN_OVERFLOW):
 +                      return buf, overflow
 +              case n < 0:
 +                      return buf, dataError("marshal error", data)
 +              default:
 +                      return buf[:n], nil
 +              }
 +      }
 +      return encodeGrow(buffer, encode)
 +}
 +
 +const minEncode = 256
 +
 +// overflow is returned when an encoding function can't fit data in the 
buffer.
 +var overflow = fmt.Errorf("buffer too small")
 +
 +// encodeFn encodes into buffer[0:len(buffer)].
 +// Returns buffer with length adjusted for data encoded.
 +// If buffer too small, returns overflow as error.
 +type encodeFn func(buffer []byte) ([]byte, error)
 +
 +// encodeGrow calls encode() into buffer, if it returns overflow grows the 
buffer.
 +// Returns the final buffer.
 +func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
 +      if buffer == nil || len(buffer) == 0 {
 +              buffer = make([]byte, minEncode)
 +      }
 +      var err error
 +      for buffer, err = encode(buffer); err == overflow; buffer, err = 
encode(buffer) {
 +              buffer = make([]byte, 2*len(buffer))
 +      }
 +      return buffer, err
 +}
 +
 +func marshal(v interface{}, data *C.pn_data_t) {
 +      switch v := v.(type) {
 +      case nil:
 +              C.pn_data_put_null(data)
 +      case bool:
 +              C.pn_data_put_bool(data, C.bool(v))
 +      case int8:
 +              C.pn_data_put_byte(data, C.int8_t(v))
 +      case int16:
 +              C.pn_data_put_short(data, C.int16_t(v))
 +      case int32:
 +              C.pn_data_put_int(data, C.int32_t(v))
 +      case int64:
 +              C.pn_data_put_long(data, C.int64_t(v))
 +      case int:
 +              if unsafe.Sizeof(0) == 8 {
 +                      C.pn_data_put_long(data, C.int64_t(v))
 +              } else {
 +                      C.pn_data_put_int(data, C.int32_t(v))
 +              }
 +      case uint8:
 +              C.pn_data_put_ubyte(data, C.uint8_t(v))
 +      case uint16:
 +              C.pn_data_put_ushort(data, C.uint16_t(v))
 +      case uint32:
 +              C.pn_data_put_uint(data, C.uint32_t(v))
 +      case uint64:
 +              C.pn_data_put_ulong(data, C.uint64_t(v))
 +      case uint:
 +              if unsafe.Sizeof(0) == 8 {
 +                      C.pn_data_put_ulong(data, C.uint64_t(v))
 +              } else {
 +                      C.pn_data_put_uint(data, C.uint32_t(v))
 +              }
 +      case float32:
 +              C.pn_data_put_float(data, C.float(v))
 +      case float64:
 +              C.pn_data_put_double(data, C.double(v))
 +      case string:
 +              C.pn_data_put_string(data, pnBytes([]byte(v)))
 +      case []byte:
 +              C.pn_data_put_binary(data, pnBytes(v))
 +      case Binary:
 +              C.pn_data_put_binary(data, pnBytes([]byte(v)))
 +      case Symbol:
 +              C.pn_data_put_symbol(data, pnBytes([]byte(v)))
 +      case Map: // Special map type
 +              C.pn_data_put_map(data)
 +              C.pn_data_enter(data)
 +              for key, val := range v {
 +                      marshal(key, data)
 +                      marshal(val, data)
 +              }
 +              C.pn_data_exit(data)
 +      default:
 +              switch reflect.TypeOf(v).Kind() {
 +              case reflect.Map:
 +                      putMap(data, v)
 +              case reflect.Slice:
 +                      putList(data, v)
 +              default:
 +                      panic(fmt.Errorf("cannot marshal %s to AMQP", 
reflect.TypeOf(v)))
 +              }
 +      }
 +      err := dataError("marshal", data)
 +      if err != nil {
 +              panic(err)
 +      }
 +      return
 +}
 +
 +func clearMarshal(v interface{}, data *C.pn_data_t) {
 +      C.pn_data_clear(data)
 +      marshal(v, data)
 +}
 +
 +func putMap(data *C.pn_data_t, v interface{}) {
 +      mapValue := reflect.ValueOf(v)
 +      C.pn_data_put_map(data)
 +      C.pn_data_enter(data)
 +      for _, key := range mapValue.MapKeys() {
 +              marshal(key.Interface(), data)
 +              marshal(mapValue.MapIndex(key).Interface(), data)
 +      }
 +      C.pn_data_exit(data)
 +}
 +
 +func putList(data *C.pn_data_t, v interface{}) {
 +      listValue := reflect.ValueOf(v)
 +      C.pn_data_put_list(data)
 +      C.pn_data_enter(data)
 +      for i := 0; i < listValue.Len(); i++ {
 +              marshal(listValue.Index(i).Interface(), data)
 +      }
 +      C.pn_data_exit(data)
 +}
 +
 +// Encoder encodes AMQP values to an io.Writer
 +type Encoder struct {
 +      writer io.Writer
 +      buffer []byte
 +}
 +
 +// New encoder returns a new encoder that writes to w.
 +func NewEncoder(w io.Writer) *Encoder {
 +      return &Encoder{w, make([]byte, minEncode)}
 +}
 +
 +func (e *Encoder) Encode(v interface{}) (err error) {
 +      e.buffer, err = Marshal(v, e.buffer)
 +      if err == nil {
 +              e.writer.Write(e.buffer)
 +      }
 +      return err
 +}
 +
 +func replace(data *C.pn_data_t, v interface{}) {
 +      C.pn_data_clear(data)
 +      marshal(v, data)
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/amqp/message.go
----------------------------------------------------------------------
diff --cc amqp/message.go
index e36c6f2,0000000..1d1287f
mode 100644,000000..100644
--- a/amqp/message.go
+++ b/amqp/message.go
@@@ -1,346 -1,0 +1,346 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
++// #include "codec_shim.h"
 +// #include <proton/types.h>
 +// #include <proton/message.h>
- // #include <proton/codec.h>
 +// #include <stdlib.h>
 +//
 +// /* Helper for setting message string fields */
 +// typedef int (*set_fn)(pn_message_t*, const char*);
 +// int msg_set_str(pn_message_t* m, char* s, set_fn set) {
 +//     int result = set(m, s);
 +//     free(s);
 +//     return result;
 +// }
 +//
 +import "C"
 +
 +import (
 +      "fmt"
 +      "runtime"
 +      "time"
 +      "unsafe"
 +)
 +
 +// Message is the interface to an AMQP message.
 +type Message interface {
 +      // Durable indicates that any parties taking responsibility
 +      // for the message must durably store the content.
 +      Durable() bool
 +      SetDurable(bool)
 +
 +      // Priority impacts ordering guarantees. Within a
 +      // given ordered context, higher priority messages may jump ahead of
 +      // lower priority messages.
 +      Priority() uint8
 +      SetPriority(uint8)
 +
 +      // TTL or Time To Live, a message it may be dropped after this duration
 +      TTL() time.Duration
 +      SetTTL(time.Duration)
 +
 +      // FirstAcquirer indicates
 +      // that the recipient of the message is the first recipient to acquire
 +      // the message, i.e. there have been no failed delivery attempts to
 +      // other acquirers. Note that this does not mean the message has not
 +      // been delivered to, but not acquired, by other recipients.
 +      FirstAcquirer() bool
 +      SetFirstAcquirer(bool)
 +
 +      // DeliveryCount tracks how many attempts have been made to
 +      // delivery a message.
 +      DeliveryCount() uint32
 +      SetDeliveryCount(uint32)
 +
 +      // MessageId provides a unique identifier for a message.
 +      // it can be an a string, an unsigned long, a uuid or a
 +      // binary value.
 +      MessageId() interface{}
 +      SetMessageId(interface{})
 +
 +      UserId() string
 +      SetUserId(string)
 +
 +      Address() string
 +      SetAddress(string)
 +
 +      Subject() string
 +      SetSubject(string)
 +
 +      ReplyTo() string
 +      SetReplyTo(string)
 +
 +      // CorrelationId is set on correlated request and response messages. It 
can be
 +      // an a string, an unsigned long, a uuid or a binary value.
 +      CorrelationId() interface{}
 +      SetCorrelationId(interface{})
 +
 +      ContentType() string
 +      SetContentType(string)
 +
 +      ContentEncoding() string
 +      SetContentEncoding(string)
 +
 +      // ExpiryTime indicates an absoulte time when the message may be 
dropped.
 +      // A Zero time (i.e. t.isZero() == true) indicates a message never 
expires.
 +      ExpiryTime() time.Time
 +      SetExpiryTime(time.Time)
 +
 +      CreationTime() time.Time
 +      SetCreationTime(time.Time)
 +
 +      GroupId() string
 +      SetGroupId(string)
 +
 +      GroupSequence() int32
 +      SetGroupSequence(int32)
 +
 +      ReplyToGroupId() string
 +      SetReplyToGroupId(string)
 +
 +      // Instructions - AMQP delivery instructions.
 +      Instructions() map[string]interface{}
 +      SetInstructions(v map[string]interface{})
 +
 +      // Annotations - AMQP annotations.
 +      Annotations() map[string]interface{}
 +      SetAnnotations(v map[string]interface{})
 +
 +      // Properties - Application properties.
 +      Properties() map[string]interface{}
 +      SetProperties(v map[string]interface{})
 +
 +      // Inferred indicates how the message content
 +      // is encoded into AMQP sections. If inferred is true then binary and
 +      // list values in the body of the message will be encoded as AMQP DATA
 +      // and AMQP SEQUENCE sections, respectively. If inferred is false,
 +      // then all values in the body of the message will be encoded as AMQP
 +      // VALUE sections regardless of their type.
 +      Inferred() bool
 +      SetInferred(bool)
 +
 +      // Marshal a Go value into the message body. See amqp.Marshal() for 
details.
 +      Marshal(interface{})
 +
 +      // Unmarshal the message body into the value pointed to by v. See 
amqp.Unmarshal() for details.
 +      Unmarshal(interface{})
 +
 +      // Body value resulting from the default unmarshalling of message body 
as interface{}
 +      Body() interface{}
 +
 +      // Encode encodes the message as AMQP data. If buffer is non-nil and is 
large enough
 +      // the message is encoded into it, otherwise a new buffer is created.
 +      // Returns the buffer containing the message.
 +      Encode(buffer []byte) ([]byte, error)
 +
 +      // Decode data into this message. Overwrites an existing message 
content.
 +      Decode(buffer []byte) error
 +
 +      // Clear the message contents.
 +      Clear()
 +
 +      // Copy the contents of another message to this one.
 +      Copy(m Message) error
 +}
 +
 +type message struct{ pn *C.pn_message_t }
 +
 +func freeMessage(m *message) {
 +      C.pn_message_free(m.pn)
 +      m.pn = nil
 +}
 +
 +// NewMessage creates a new message instance.
 +func NewMessage() Message {
 +      m := &message{C.pn_message()}
 +      runtime.SetFinalizer(m, freeMessage)
 +      return m
 +}
 +
 +// NewMessageWith creates a message with value as the body. Equivalent to
 +//     m := NewMessage(); m.Marshal(body)
 +func NewMessageWith(value interface{}) Message {
 +      m := NewMessage()
 +      m.Marshal(value)
 +      return m
 +}
 +
 +func (m *message) Clear() { C.pn_message_clear(m.pn) }
 +
 +func (m *message) Copy(x Message) error {
 +      if data, err := x.Encode(nil); err == nil {
 +              return m.Decode(data)
 +      } else {
 +              return err
 +      }
 +}
 +
 +// ==== message get functions
 +
 +func rewindGet(data *C.pn_data_t) (v interface{}) {
 +      C.pn_data_rewind(data)
 +      C.pn_data_next(data)
 +      unmarshal(&v, data)
 +      return v
 +}
 +
 +func rewindMap(data *C.pn_data_t) (v map[string]interface{}) {
 +      C.pn_data_rewind(data)
 +      C.pn_data_next(data)
 +      unmarshal(&v, data)
 +      return v
 +}
 +
 +func (m *message) Inferred() bool  { return 
bool(C.pn_message_is_inferred(m.pn)) }
 +func (m *message) Durable() bool   { return 
bool(C.pn_message_is_durable(m.pn)) }
 +func (m *message) Priority() uint8 { return 
uint8(C.pn_message_get_priority(m.pn)) }
 +func (m *message) TTL() time.Duration {
 +      return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
 +}
 +func (m *message) FirstAcquirer() bool        { return 
bool(C.pn_message_is_first_acquirer(m.pn)) }
 +func (m *message) DeliveryCount() uint32      { return 
uint32(C.pn_message_get_delivery_count(m.pn)) }
 +func (m *message) MessageId() interface{}     { return 
rewindGet(C.pn_message_id(m.pn)) }
 +func (m *message) UserId() string             { return 
goString(C.pn_message_get_user_id(m.pn)) }
 +func (m *message) Address() string            { return 
C.GoString(C.pn_message_get_address(m.pn)) }
 +func (m *message) Subject() string            { return 
C.GoString(C.pn_message_get_subject(m.pn)) }
 +func (m *message) ReplyTo() string            { return 
C.GoString(C.pn_message_get_reply_to(m.pn)) }
 +func (m *message) CorrelationId() interface{} { return 
rewindGet(C.pn_message_correlation_id(m.pn)) }
 +func (m *message) ContentType() string        { return 
C.GoString(C.pn_message_get_content_type(m.pn)) }
 +func (m *message) ContentEncoding() string    { return 
C.GoString(C.pn_message_get_content_encoding(m.pn)) }
 +
 +func (m *message) ExpiryTime() time.Time {
 +      return time.Unix(0, 
int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
 +}
 +func (m *message) CreationTime() time.Time {
 +      return time.Unix(0, 
int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
 +}
 +func (m *message) GroupId() string        { return 
C.GoString(C.pn_message_get_group_id(m.pn)) }
 +func (m *message) GroupSequence() int32   { return 
int32(C.pn_message_get_group_sequence(m.pn)) }
 +func (m *message) ReplyToGroupId() string { return 
C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
 +
 +func (m *message) Instructions() map[string]interface{} {
 +      return rewindMap(C.pn_message_instructions(m.pn))
 +}
 +func (m *message) Annotations() map[string]interface{} {
 +      return rewindMap(C.pn_message_annotations(m.pn))
 +}
 +func (m *message) Properties() map[string]interface{} {
 +      return rewindMap(C.pn_message_properties(m.pn))
 +}
 +
 +// ==== message set methods
 +
 +func setData(v interface{}, data *C.pn_data_t) {
 +      C.pn_data_clear(data)
 +      marshal(v, data)
 +}
 +
 +func dataString(data *C.pn_data_t) string {
 +      str := C.pn_string(C.CString(""))
 +      defer C.pn_free(unsafe.Pointer(str))
 +      C.pn_inspect(unsafe.Pointer(data), str)
 +      return C.GoString(C.pn_string_get(str))
 +}
 +
 +func (m *message) SetInferred(b bool)  { C.pn_message_set_inferred(m.pn, 
C.bool(m.Inferred())) }
 +func (m *message) SetDurable(b bool)   { C.pn_message_set_durable(m.pn, 
C.bool(b)) }
 +func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, 
C.uint8_t(b)) }
 +func (m *message) SetTTL(d time.Duration) {
 +      C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
 +}
 +func (m *message) SetFirstAcquirer(b bool)     { 
C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
 +func (m *message) SetDeliveryCount(c uint32)   { 
C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
 +func (m *message) SetMessageId(id interface{}) { setData(id, 
C.pn_message_id(m.pn)) }
 +func (m *message) SetUserId(s string)          { 
C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
 +func (m *message) SetAddress(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
 +}
 +func (m *message) SetSubject(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
 +}
 +func (m *message) SetReplyTo(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
 +}
 +func (m *message) SetCorrelationId(c interface{}) { setData(c, 
C.pn_message_correlation_id(m.pn)) }
 +func (m *message) SetContentType(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), 
C.set_fn(C.pn_message_set_content_type))
 +}
 +func (m *message) SetContentEncoding(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), 
C.set_fn(C.pn_message_set_content_encoding))
 +}
 +func (m *message) SetExpiryTime(t time.Time)   { 
C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
 +func (m *message) SetCreationTime(t time.Time) { 
C.pn_message_set_creation_time(m.pn, pnTime(t)) }
 +func (m *message) SetGroupId(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
 +}
 +func (m *message) SetGroupSequence(s int32) {
 +      C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
 +}
 +func (m *message) SetReplyToGroupId(s string) {
 +      C.msg_set_str(m.pn, C.CString(s), 
C.set_fn(C.pn_message_set_reply_to_group_id))
 +}
 +
 +func (m *message) SetInstructions(v map[string]interface{}) {
 +      setData(v, C.pn_message_instructions(m.pn))
 +}
 +func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, 
C.pn_message_annotations(m.pn)) }
 +func (m *message) SetProperties(v map[string]interface{})  { setData(v, 
C.pn_message_properties(m.pn)) }
 +
 +// Marshal/Unmarshal body
 +func (m *message) Marshal(v interface{})   { clearMarshal(v, 
C.pn_message_body(m.pn)) }
 +func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, 
C.pn_message_body(m.pn)) }
 +func (m *message) Body() (v interface{})   { m.Unmarshal(&v); return }
 +
 +func (m *message) Decode(data []byte) error {
 +      m.Clear()
 +      if len(data) == 0 {
 +              return fmt.Errorf("empty buffer for decode")
 +      }
 +      if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
 +              return fmt.Errorf("decoding message: %s", 
PnError(C.pn_message_error(m.pn)))
 +      }
 +      return nil
 +}
 +
 +func DecodeMessage(data []byte) (m Message, err error) {
 +      m = NewMessage()
 +      err = m.Decode(data)
 +      return
 +}
 +
 +func (m *message) Encode(buffer []byte) ([]byte, error) {
 +      encode := func(buf []byte) ([]byte, error) {
 +              len := cLen(buf)
 +              result := C.pn_message_encode(m.pn, cPtr(buf), &len)
 +              switch {
 +              case result == C.PN_OVERFLOW:
 +                      return buf, overflow
 +              case result < 0:
 +                      return buf, fmt.Errorf("cannot encode message: %s", 
PnErrorCode(result))
 +              default:
 +                      return buf[:len], nil
 +              }
 +      }
 +      return encodeGrow(buffer, encode)
 +}
 +
 +// TODO aconway 2015-09-14: Multi-section messages.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/amqp/types.go
----------------------------------------------------------------------
diff --cc amqp/types.go
index 697d896,0000000..d927cc5
mode 100644,000000..100644
--- a/amqp/types.go
+++ b/amqp/types.go
@@@ -1,201 -1,0 +1,196 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
- // #include <proton/codec.h>
++//#include "codec_shim.h"
 +import "C"
 +
 +import (
 +      "bytes"
 +      "fmt"
 +      "reflect"
 +      "time"
 +      "unsafe"
 +)
 +
- // Older proton versions don't define C.PN_INVALID, so define it here.
- // In C it is pn_type_t(-1), in Go use the bitwise NOT operator to get the 
same value.
- const pnInvalid = C.pn_type_t(^0)
- 
 +func (t C.pn_type_t) String() string {
 +      switch C.pn_type_t(t) {
 +      case C.PN_NULL:
 +              return "null"
 +      case C.PN_BOOL:
 +              return "bool"
 +      case C.PN_UBYTE:
 +              return "ubyte"
 +      case C.PN_BYTE:
 +              return "byte"
 +      case C.PN_USHORT:
 +              return "ushort"
 +      case C.PN_SHORT:
 +              return "short"
 +      case C.PN_CHAR:
 +              return "char"
 +      case C.PN_UINT:
 +              return "uint"
 +      case C.PN_INT:
 +              return "int"
 +      case C.PN_ULONG:
 +              return "ulong"
 +      case C.PN_LONG:
 +              return "long"
 +      case C.PN_TIMESTAMP:
 +              return "timestamp"
 +      case C.PN_FLOAT:
 +              return "float"
 +      case C.PN_DOUBLE:
 +              return "double"
 +      case C.PN_DECIMAL32:
 +              return "decimal32"
 +      case C.PN_DECIMAL64:
 +              return "decimal64"
 +      case C.PN_DECIMAL128:
 +              return "decimal128"
 +      case C.PN_UUID:
 +              return "uuid"
 +      case C.PN_BINARY:
 +              return "binary"
 +      case C.PN_STRING:
 +              return "string"
 +      case C.PN_SYMBOL:
 +              return "symbol"
 +      case C.PN_DESCRIBED:
 +              return "described"
 +      case C.PN_ARRAY:
 +              return "array"
 +      case C.PN_LIST:
 +              return "list"
 +      case C.PN_MAP:
 +              return "map"
++      case C.PN_INVALID:
++              return "no-data"
 +      default:
-               if t == pnInvalid {
-                       return "no-data"
-               }
 +              return fmt.Sprintf("unknown-type(%d)", t)
 +      }
 +}
 +
 +// Go types
 +var (
 +      bytesType = reflect.TypeOf([]byte{})
 +      valueType = reflect.TypeOf(reflect.Value{})
 +)
 +
 +// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are 
not valid Go map keys.
 +
 +// Map is a generic map that can have mixed key and value types and so can 
represent any AMQP map
 +type Map map[interface{}]interface{}
 +
 +// List is a generic list that can hold mixed values and can represent any 
AMQP list.
 +//
 +type List []interface{}
 +
 +// Symbol is a string that is encoded as an AMQP symbol
 +type Symbol string
 +
 +func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) }
 +
 +// Binary is a string that is encoded as an AMQP binary.
 +// It is a string rather than a byte[] because byte[] is not hashable and 
can't be used as
 +// a map key, AMQP frequently uses binary types as map keys. It can convert 
to and from []byte
 +type Binary string
 +
 +func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) }
 +
 +// GoString for Map prints values with their types, useful for debugging.
 +func (m Map) GoString() string {
 +      out := &bytes.Buffer{}
 +      fmt.Fprintf(out, "%T{", m)
 +      i := len(m)
 +      for k, v := range m {
 +              fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v)
 +              i--
 +              if i > 0 {
 +                      fmt.Fprint(out, ", ")
 +              }
 +      }
 +      fmt.Fprint(out, "}")
 +      return out.String()
 +}
 +
 +// GoString for List prints values with their types, useful for debugging.
 +func (l List) GoString() string {
 +      out := &bytes.Buffer{}
 +      fmt.Fprintf(out, "%T{", l)
 +      for i := 0; i < len(l); i++ {
 +              fmt.Fprintf(out, "%T(%#v)", l[i], l[i])
 +              if i == len(l)-1 {
 +                      fmt.Fprint(out, ", ")
 +              }
 +      }
 +      fmt.Fprint(out, "}")
 +      return out.String()
 +}
 +
 +// pnTime converts Go time.Time to Proton millisecond Unix time.
 +func pnTime(t time.Time) C.pn_timestamp_t {
 +      secs := t.Unix()
 +      // Note: sub-second accuracy is not guaraunteed if the Unix time in
 +      // nanoseconds cannot be represented by an int64 (sometime around year 
2260)
 +      msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
 +      return C.pn_timestamp_t(secs*1000 + msecs)
 +}
 +
 +// goTime converts a pn_timestamp_t to a Go time.Time.
 +func goTime(t C.pn_timestamp_t) time.Time {
 +      secs := int64(t) / 1000
 +      nsecs := (int64(t) % 1000) * int64(time.Millisecond)
 +      return time.Unix(secs, nsecs)
 +}
 +
 +func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
 +      if cBytes.start != nil {
 +              bytes = C.GoBytes(unsafe.Pointer(cBytes.start), 
C.int(cBytes.size))
 +      }
 +      return
 +}
 +
 +func goString(cBytes C.pn_bytes_t) (str string) {
 +      if cBytes.start != nil {
 +              str = C.GoStringN(cBytes.start, C.int(cBytes.size))
 +      }
 +      return
 +}
 +
 +func pnBytes(b []byte) C.pn_bytes_t {
 +      if len(b) == 0 {
 +              return C.pn_bytes_t{0, nil}
 +      } else {
 +              return C.pn_bytes_t{C.size_t(len(b)), 
(*C.char)(unsafe.Pointer(&b[0]))}
 +      }
 +}
 +
 +func cPtr(b []byte) *C.char {
 +      if len(b) == 0 {
 +              return nil
 +      }
 +      return (*C.char)(unsafe.Pointer(&b[0]))
 +}
 +
 +func cLen(b []byte) C.size_t {
 +      return C.size_t(len(b))
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/amqp/unmarshal.go
----------------------------------------------------------------------
diff --cc amqp/unmarshal.go
index 05ecb8d,0000000..6942174
mode 100644,000000..100644
--- a/amqp/unmarshal.go
+++ b/amqp/unmarshal.go
@@@ -1,561 -1,0 +1,561 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +oor more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
- // #include <proton/codec.h>
++// #include "codec_shim.h"
 +import "C"
 +
 +import (
 +      "bytes"
 +      "fmt"
 +      "io"
 +      "reflect"
 +      "unsafe"
 +)
 +
 +const minDecode = 1024
 +
 +// Error returned if AMQP data cannot be unmarshaled as the desired Go type.
 +type UnmarshalError struct {
 +      // The name of the AMQP type.
 +      AMQPType string
 +      // The Go type.
 +      GoType reflect.Type
 +}
 +
 +func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
 +      return &UnmarshalError{C.pn_type_t(pnType).String(), reflect.TypeOf(v)}
 +}
 +
 +func (e UnmarshalError) Error() string {
 +      if e.GoType.Kind() != reflect.Ptr {
 +              return fmt.Sprintf("cannot unmarshal to type %s, not a 
pointer", e.GoType)
 +      } else {
 +              return fmt.Sprintf("cannot unmarshal AMQP %s to %s", 
e.AMQPType, e.GoType)
 +      }
 +}
 +
 +func doRecover(err *error) {
 +      r := recover()
 +      switch r := r.(type) {
 +      case nil:
 +      case *UnmarshalError:
 +              *err = r
 +      default:
 +              panic(r)
 +      }
 +}
 +
 +//
 +// Decoding from a pn_data_t
 +//
 +// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
 +// We recover() at the highest possible level - i.e. in the exported 
Unmarshal or Decode.
 +//
 +
 +// Decoder decodes AMQP values from an io.Reader.
 +//
 +type Decoder struct {
 +      reader io.Reader
 +      buffer bytes.Buffer
 +}
 +
 +// NewDecoder returns a new decoder that reads from r.
 +//
 +// The decoder has it's own buffer and may read more data than required for 
the
 +// AMQP values requested.  Use Buffered to see if there is data left in the
 +// buffer.
 +//
 +func NewDecoder(r io.Reader) *Decoder {
 +      return &Decoder{r, bytes.Buffer{}}
 +}
 +
 +// Buffered returns a reader of the data remaining in the Decoder's buffer. 
The
 +// reader is valid until the next call to Decode.
 +//
 +func (d *Decoder) Buffered() io.Reader {
 +      return bytes.NewReader(d.buffer.Bytes())
 +}
 +
 +// Decode reads the next AMQP value from the Reader and stores it in the 
value pointed to by v.
 +//
 +// See the documentation for Unmarshal for details about the conversion of 
AMQP into a Go value.
 +//
 +func (d *Decoder) Decode(v interface{}) (err error) {
 +      defer doRecover(&err)
 +      data := C.pn_data(0)
 +      defer C.pn_data_free(data)
 +      var n int
 +      for n == 0 {
 +              n, err = decode(data, d.buffer.Bytes())
 +              if err != nil {
 +                      return err
 +              }
 +              if n == 0 { // n == 0 means not enough data, read more
 +                      err = d.more()
 +              } else {
 +                      unmarshal(v, data)
 +              }
 +      }
 +      d.buffer.Next(n)
 +      return
 +}
 +
 +/*
 +Unmarshal decodes AMQP-encoded bytes and stores the result in the value 
pointed to by v.
 +Types are converted as follows:
 +
 + 
+---------------------------+----------------------------------------------------------------------+
 + |To Go types                |From AMQP types                                 
                      |
 + 
+===========================+======================================================================+
 + |bool                       |bool                                            
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |int, int8, int16,          |Equivalent or smaller signed integer type: 
byte, short, int, long.    |
 + |int32, int64               |                                                
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |uint, uint8, uint16,       |Equivalent or smaller unsigned integer type: 
ubyte, ushort, uint,     |
 + |uint32, uint64 types       |ulong                                           
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |float32, float64           |Equivalent or smaller float or double.          
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |string, []byte             |string, symbol or binary.                       
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |Symbol                     |symbol                                          
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |map[K]T                    |map, provided all keys and values can unmarshal 
to types K, T         |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |Map                        |map, any AMQP map                               
                      |
 + 
+---------------------------+----------------------------------------------------------------------+
 + |interface{}                |Any AMQP value can be unmarshaled to an 
interface{} as follows:       |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |AMQP Type               |Go Type in interface{} 
                      |
 + |                           
+========================+=============================================+
 + |                           |bool                    |bool                   
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |byte,short,int,long     |int8,int16,int32,int64 
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |ubyte,ushort,uint,ulong 
|uint8,uint16,uint32,uint64                   |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |float, double           |float32, float64       
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |string                  |string                 
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |symbol                  |Symbol                 
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |binary                  |Binary                 
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |nulll                   |nil                    
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |map                     |Map                    
                      |
 + |                           
+------------------------+---------------------------------------------+
 + |                           |list                    |List                   
                      |
 + 
+---------------------------+------------------------+---------------------------------------------+
 +
 +The following Go types cannot be unmarshaled: uintptr, function, interface, 
channel.
 +
 +TODO
 +
 +Go types: array, struct.
 +
 +AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, 
multi-section message bodies.
 +
 +AMQP maps with mixed/unhashable key types need an alternate representation.
 +
 +Described types.
 +*/
 +func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
 +      defer doRecover(&err)
 +
 +      data := C.pn_data(0)
 +      defer C.pn_data_free(data)
 +      n, err = decode(data, bytes)
 +      if err != nil {
 +              return 0, err
 +      }
 +      if n == 0 {
 +              return 0, fmt.Errorf("not enough data")
 +      } else {
 +              unmarshal(v, data)
 +      }
 +      return n, nil
 +}
 +
 +// more reads more data when we can't parse a complete AMQP type
 +func (d *Decoder) more() error {
 +      var readSize int64 = minDecode
 +      if int64(d.buffer.Len()) > readSize { // Grow by doubling
 +              readSize = int64(d.buffer.Len())
 +      }
 +      var n int64
 +      n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
 +      if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
 +              err = io.EOF
 +      }
 +      return err
 +}
 +
 +// Unmarshal from data into value pointed at by v.
 +func unmarshal(v interface{}, data *C.pn_data_t) {
 +      pnType := C.pn_data_type(data)
 +      switch v := v.(type) {
 +      case *bool:
 +              switch pnType {
 +              case C.PN_BOOL:
 +                      *v = bool(C.pn_data_get_bool(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *int8:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = int8(C.pn_data_get_char(data))
 +              case C.PN_BYTE:
 +                      *v = int8(C.pn_data_get_byte(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *uint8:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = uint8(C.pn_data_get_char(data))
 +              case C.PN_UBYTE:
 +                      *v = uint8(C.pn_data_get_ubyte(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *int16:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = int16(C.pn_data_get_char(data))
 +              case C.PN_BYTE:
 +                      *v = int16(C.pn_data_get_byte(data))
 +              case C.PN_SHORT:
 +                      *v = int16(C.pn_data_get_short(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *uint16:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = uint16(C.pn_data_get_char(data))
 +              case C.PN_UBYTE:
 +                      *v = uint16(C.pn_data_get_ubyte(data))
 +              case C.PN_USHORT:
 +                      *v = uint16(C.pn_data_get_ushort(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *int32:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = int32(C.pn_data_get_char(data))
 +              case C.PN_BYTE:
 +                      *v = int32(C.pn_data_get_byte(data))
 +              case C.PN_SHORT:
 +                      *v = int32(C.pn_data_get_short(data))
 +              case C.PN_INT:
 +                      *v = int32(C.pn_data_get_int(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      case *uint32:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = uint32(C.pn_data_get_char(data))
 +              case C.PN_UBYTE:
 +                      *v = uint32(C.pn_data_get_ubyte(data))
 +              case C.PN_USHORT:
 +                      *v = uint32(C.pn_data_get_ushort(data))
 +              case C.PN_UINT:
 +                      *v = uint32(C.pn_data_get_uint(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *int64:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = int64(C.pn_data_get_char(data))
 +              case C.PN_BYTE:
 +                      *v = int64(C.pn_data_get_byte(data))
 +              case C.PN_SHORT:
 +                      *v = int64(C.pn_data_get_short(data))
 +              case C.PN_INT:
 +                      *v = int64(C.pn_data_get_int(data))
 +              case C.PN_LONG:
 +                      *v = int64(C.pn_data_get_long(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *uint64:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = uint64(C.pn_data_get_char(data))
 +              case C.PN_UBYTE:
 +                      *v = uint64(C.pn_data_get_ubyte(data))
 +              case C.PN_USHORT:
 +                      *v = uint64(C.pn_data_get_ushort(data))
 +              case C.PN_ULONG:
 +                      *v = uint64(C.pn_data_get_ulong(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *int:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = int(C.pn_data_get_char(data))
 +              case C.PN_BYTE:
 +                      *v = int(C.pn_data_get_byte(data))
 +              case C.PN_SHORT:
 +                      *v = int(C.pn_data_get_short(data))
 +              case C.PN_INT:
 +                      *v = int(C.pn_data_get_int(data))
 +              case C.PN_LONG:
 +                      if unsafe.Sizeof(0) == 8 {
 +                              *v = int(C.pn_data_get_long(data))
 +                      } else {
 +                              panic(newUnmarshalError(pnType, v))
 +                      }
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *uint:
 +              switch pnType {
 +              case C.PN_CHAR:
 +                      *v = uint(C.pn_data_get_char(data))
 +              case C.PN_UBYTE:
 +                      *v = uint(C.pn_data_get_ubyte(data))
 +              case C.PN_USHORT:
 +                      *v = uint(C.pn_data_get_ushort(data))
 +              case C.PN_UINT:
 +                      *v = uint(C.pn_data_get_uint(data))
 +              case C.PN_ULONG:
 +                      if unsafe.Sizeof(0) == 8 {
 +                              *v = uint(C.pn_data_get_ulong(data))
 +                      } else {
 +                              panic(newUnmarshalError(pnType, v))
 +                      }
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *float32:
 +              switch pnType {
 +              case C.PN_FLOAT:
 +                      *v = float32(C.pn_data_get_float(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *float64:
 +              switch pnType {
 +              case C.PN_FLOAT:
 +                      *v = float64(C.pn_data_get_float(data))
 +              case C.PN_DOUBLE:
 +                      *v = float64(C.pn_data_get_double(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *string:
 +              switch pnType {
 +              case C.PN_STRING:
 +                      *v = goString(C.pn_data_get_string(data))
 +              case C.PN_SYMBOL:
 +                      *v = goString(C.pn_data_get_symbol(data))
 +              case C.PN_BINARY:
 +                      *v = goString(C.pn_data_get_binary(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *[]byte:
 +              switch pnType {
 +              case C.PN_STRING:
 +                      *v = goBytes(C.pn_data_get_string(data))
 +              case C.PN_SYMBOL:
 +                      *v = goBytes(C.pn_data_get_symbol(data))
 +              case C.PN_BINARY:
 +                      *v = goBytes(C.pn_data_get_binary(data))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *Binary:
 +              switch pnType {
 +              case C.PN_BINARY:
 +                      *v = Binary(goBytes(C.pn_data_get_binary(data)))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *Symbol:
 +              switch pnType {
 +              case C.PN_SYMBOL:
 +                      *v = Symbol(goBytes(C.pn_data_get_symbol(data)))
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +
 +      case *interface{}:
 +              getInterface(data, v)
 +
 +      default:
 +              if reflect.TypeOf(v).Kind() != reflect.Ptr {
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +              switch reflect.TypeOf(v).Elem().Kind() {
 +              case reflect.Map:
 +                      getMap(data, v)
 +              case reflect.Slice:
 +                      getList(data, v)
 +              default:
 +                      panic(newUnmarshalError(pnType, v))
 +              }
 +      }
 +      err := dataError("unmarshaling", data)
 +      if err != nil {
 +              panic(err)
 +      }
 +      return
 +}
 +
 +func rewindUnmarshal(v interface{}, data *C.pn_data_t) {
 +      C.pn_data_rewind(data)
 +      C.pn_data_next(data)
 +      unmarshal(v, data)
 +}
 +
 +// Getting into an interface is driven completely by the AMQP type, since the 
interface{}
 +// target is type-neutral.
 +func getInterface(data *C.pn_data_t, v *interface{}) {
 +      pnType := C.pn_data_type(data)
 +      switch pnType {
-       case C.PN_NULL, pnInvalid: // No data.
++      case C.PN_NULL, C.PN_INVALID: // No data.
 +              *v = nil
 +      case C.PN_BOOL:
 +              *v = bool(C.pn_data_get_bool(data))
 +      case C.PN_UBYTE:
 +              *v = uint8(C.pn_data_get_ubyte(data))
 +      case C.PN_BYTE:
 +              *v = int8(C.pn_data_get_byte(data))
 +      case C.PN_USHORT:
 +              *v = uint16(C.pn_data_get_ushort(data))
 +      case C.PN_SHORT:
 +              *v = int16(C.pn_data_get_short(data))
 +      case C.PN_UINT:
 +              *v = uint32(C.pn_data_get_uint(data))
 +      case C.PN_INT:
 +              *v = int32(C.pn_data_get_int(data))
 +      case C.PN_CHAR:
 +              *v = uint8(C.pn_data_get_char(data))
 +      case C.PN_ULONG:
 +              *v = uint64(C.pn_data_get_ulong(data))
 +      case C.PN_LONG:
 +              *v = int64(C.pn_data_get_long(data))
 +      case C.PN_FLOAT:
 +              *v = float32(C.pn_data_get_float(data))
 +      case C.PN_DOUBLE:
 +              *v = float64(C.pn_data_get_double(data))
 +      case C.PN_BINARY:
 +              *v = Binary(goBytes(C.pn_data_get_binary(data)))
 +      case C.PN_STRING:
 +              *v = goString(C.pn_data_get_string(data))
 +      case C.PN_SYMBOL:
 +              *v = Symbol(goString(C.pn_data_get_symbol(data)))
 +      case C.PN_MAP:
 +              m := make(Map)
 +              unmarshal(&m, data)
 +              *v = m
 +      case C.PN_LIST:
 +              l := make(List, 0)
 +              unmarshal(&l, data)
 +              *v = l
 +      default:
 +              panic(newUnmarshalError(pnType, v))
 +      }
 +}
 +
 +// get into map pointed at by v
 +func getMap(data *C.pn_data_t, v interface{}) {
 +      mapValue := reflect.ValueOf(v).Elem()
 +      mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map
 +      switch pnType := C.pn_data_type(data); pnType {
 +      case C.PN_MAP:
 +              count := int(C.pn_data_get_map(data))
 +              if bool(C.pn_data_enter(data)) {
 +                      defer C.pn_data_exit(data)
 +                      for i := 0; i < count/2; i++ {
 +                              if bool(C.pn_data_next(data)) {
 +                                      key := 
reflect.New(mapValue.Type().Key())
 +                                      unmarshal(key.Interface(), data)
 +                                      if bool(C.pn_data_next(data)) {
 +                                              val := 
reflect.New(mapValue.Type().Elem())
 +                                              unmarshal(val.Interface(), data)
 +                                              
mapValue.SetMapIndex(key.Elem(), val.Elem())
 +                                      }
 +                              }
 +                      }
 +              }
-       case pnInvalid: // Leave the map empty
++      case C.PN_INVALID: // Leave the map empty
 +      default:
 +              panic(newUnmarshalError(pnType, v))
 +      }
 +}
 +
 +func getList(data *C.pn_data_t, v interface{}) {
 +      pnType := C.pn_data_type(data)
 +      if pnType != C.PN_LIST {
 +              panic(newUnmarshalError(pnType, v))
 +      }
 +      count := int(C.pn_data_get_list(data))
 +      listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count)
 +      if bool(C.pn_data_enter(data)) {
 +              for i := 0; i < count; i++ {
 +                      if bool(C.pn_data_next(data)) {
 +                              val := reflect.New(listValue.Type().Elem())
 +                              unmarshal(val.Interface(), data)
 +                              listValue.Index(i).Set(val.Elem())
 +                      }
 +              }
 +              C.pn_data_exit(data)
 +      }
 +      reflect.ValueOf(v).Elem().Set(listValue)
 +}
 +
 +// decode from bytes.
 +// Return bytes decoded or 0 if we could not decode a complete object.
 +//
 +func decode(data *C.pn_data_t, bytes []byte) (int, error) {
 +      if len(bytes) == 0 {
 +              return 0, nil
 +      }
 +      n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes)))
 +      if n == int(C.PN_UNDERFLOW) {
 +              C.pn_error_clear(C.pn_data_error(data))
 +              return 0, nil
 +      } else if n <= 0 {
 +              return 0, fmt.Errorf("unmarshal %s", PnErrorCode(n))
 +      }
 +      return n, nil
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/electron/connection.go
----------------------------------------------------------------------
diff --cc electron/connection.go
index 8a9e6cd,0000000..386875d
mode 100644,000000..100644
--- a/electron/connection.go
+++ b/electron/connection.go
@@@ -1,238 -1,0 +1,240 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +// #include <proton/disposition.h>
 +import "C"
 +
 +import (
++      "fmt"
 +      "net"
 +      "qpid.apache.org/proton"
 +      "sync"
 +      "time"
 +)
 +
 +// Connection is an AMQP connection, created by a Container.
 +type Connection interface {
 +      Endpoint
 +
 +      // Sender opens a new sender on the DefaultSession.
 +      Sender(...LinkOption) (Sender, error)
 +
 +      // Receiver opens a new Receiver on the DefaultSession().
 +      Receiver(...LinkOption) (Receiver, error)
 +
 +      // DefaultSession() returns a default session for the connection. It is 
opened
 +      // on the first call to DefaultSession and returned on subsequent calls.
 +      DefaultSession() (Session, error)
 +
 +      // Session opens a new session.
 +      Session(...SessionOption) (Session, error)
 +
 +      // Container for the connection.
 +      Container() Container
 +
 +      // Disconnect the connection abruptly with an error.
 +      Disconnect(error)
 +
 +      // Wait waits for the connection to be disconnected.
 +      Wait() error
 +
 +      // WaitTimeout is like Wait but returns Timeout if the timeout expires.
 +      WaitTimeout(time.Duration) error
 +
 +      // Incoming returns a channel for incoming endpoints opened by the 
remote end.
 +      //
 +      // To enable, pass AllowIncoming() when creating the Connection. 
Otherwise all
 +      // incoming endpoint requests are automatically rejected and Incoming()
 +      // returns nil.
 +      //
 +      // An Incoming value can be an *IncomingSession, *IncomingSender or
 +      // *IncomingReceiver.  You must call Accept() to open the endpoint or 
Reject()
 +      // to close it with an error. The specific Incoming types have 
additional
 +      // methods to configure the endpoint.
 +      //
 +      // Not receiving from Incoming() or not calling Accept/Reject will 
block the
 +      // electron event loop. Normally you would have a dedicated goroutine 
receive
 +      // from Incoming() and start new goroutines to serve each incoming 
endpoint.
 +      // The channel is closed when the Connection closes.
 +      //
 +      Incoming() <-chan Incoming
 +}
 +
 +// ConnectionOption can be passed when creating a connection to configure 
various options
 +type ConnectionOption func(*connection)
 +
 +// Server returns a ConnectionOption to put the connection in server mode.
 +//
 +// A server connection will do protocol negotiation to accept a incoming AMQP
 +// connection. Normally you would call this for a connection created by
 +// net.Listener.Accept()
 +//
 +func Server() ConnectionOption { return func(c *connection) { 
c.engine.Server() } }
 +
 +// AllowIncoming returns a ConnectionOption to enable incoming endpoint open 
requests.
 +// See Connection.Incoming()
 +func AllowIncoming() ConnectionOption {
 +      return func(c *connection) { c.incoming = make(chan Incoming) }
 +}
 +
 +type connection struct {
 +      endpoint
 +      defaultSessionOnce, closeOnce sync.Once
 +
 +      container   *container
 +      conn        net.Conn
 +      incoming    chan Incoming
 +      handler     *handler
 +      engine      *proton.Engine
 +      eConnection proton.Connection
 +
 +      defaultSession Session
 +}
 +
 +func newConnection(conn net.Conn, cont *container, setting 
...ConnectionOption) (*connection, error) {
 +      c := &connection{container: cont, conn: conn}
 +      c.handler = newHandler(c)
 +      var err error
 +      c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
 +      if err != nil {
 +              return nil, err
 +      }
 +      for _, set := range setting {
 +              set(c)
 +      }
 +      c.endpoint = makeEndpoint(c.engine.String())
 +      c.eConnection = c.engine.Connection()
 +      go c.run()
 +      return c, nil
 +}
 +
 +func (c *connection) run() {
 +      c.engine.Run()
 +      if c.incoming != nil {
 +              close(c.incoming)
 +      }
 +      c.closed(Closed)
 +}
 +
 +func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
 +
 +func (c *connection) Disconnect(err error) { c.err.Set(err); 
c.engine.Disconnect(err) }
 +
 +func (c *connection) Session(setting ...SessionOption) (Session, error) {
 +      var s Session
 +      err := c.engine.InjectWait(func() error {
 +              if c.Error() != nil {
 +                      return c.Error()
 +              }
 +              eSession, err := c.engine.Connection().Session()
 +              if err == nil {
 +                      eSession.Open()
 +                      if err == nil {
 +                              s = newSession(c, eSession, setting...)
 +                      }
 +              }
 +              return err
 +      })
 +      return s, err
 +}
 +
 +func (c *connection) Container() Container { return c.container }
 +
 +func (c *connection) DefaultSession() (s Session, err error) {
 +      c.defaultSessionOnce.Do(func() {
 +              c.defaultSession, err = c.Session()
 +      })
 +      if err == nil {
 +              err = c.Error()
 +      }
 +      return c.defaultSession, err
 +}
 +
 +func (c *connection) Sender(setting ...LinkOption) (Sender, error) {
 +      if s, err := c.DefaultSession(); err == nil {
 +              return s.Sender(setting...)
 +      } else {
 +              return nil, err
 +      }
 +}
 +
 +func (c *connection) Receiver(setting ...LinkOption) (Receiver, error) {
 +      if s, err := c.DefaultSession(); err == nil {
 +              return s.Receiver(setting...)
 +      } else {
 +              return nil, err
 +      }
 +}
 +
 +func (c *connection) Connection() Connection { return c }
 +
 +func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
 +func (c *connection) WaitTimeout(timeout time.Duration) error {
 +      _, err := timedReceive(c.done, timeout)
 +      if err == Timeout {
 +              return Timeout
 +      }
 +      return c.Error()
 +}
 +
 +func (c *connection) Incoming() <-chan Incoming { return c.incoming }
 +
 +// Incoming is the interface for incoming requests to open an endpoint.
 +// Implementing types are IncomingSession, IncomingSender and 
IncomingReceiver.
 +type Incoming interface {
 +      // Accept and open the endpoint.
 +      Accept() Endpoint
 +
 +      // Reject the endpoint with an error
 +      Reject(error)
 +
 +      // wait for and call the accept function, call in proton goroutine.
 +      wait() error
 +      pEndpoint() proton.Endpoint
 +}
 +
 +type incoming struct {
 +      endpoint proton.Endpoint
 +      acceptCh chan func() error
 +}
 +
 +func makeIncoming(e proton.Endpoint) incoming {
 +      return incoming{endpoint: e, acceptCh: make(chan func() error)}
 +}
 +
++func (in *incoming) String() string   { return fmt.Sprintf("%s: %s", 
in.endpoint.Type(), in.endpoint) }
 +func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return 
err } }
 +
 +// Call in proton goroutine, wait for and call the accept function fr
 +func (in *incoming) wait() error { return (<-in.acceptCh)() }
 +
 +func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint }
 +
 +// Called in app goroutine to send an accept function to proton and return 
the resulting endpoint.
 +func (in *incoming) accept(f func() Endpoint) Endpoint {
 +      done := make(chan Endpoint)
 +      in.acceptCh <- func() error {
 +              ep := f()
 +              done <- ep
 +              return nil
 +      }
 +      return <-done
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/electron/endpoint.go
----------------------------------------------------------------------
diff --cc electron/endpoint.go
index 8cbeadb,0000000..2b1f62d
mode 100644,000000..100644
--- a/electron/endpoint.go
+++ b/electron/endpoint.go
@@@ -1,94 -1,0 +1,99 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +      "io"
 +      "qpid.apache.org/proton"
 +)
 +
 +// Closed is an alias for io.EOF. It is returned as an error when an endpoint
 +// was closed cleanly.
 +var Closed = io.EOF
 +
 +// Endpoint is the common interface for Connection, Session, Link, Sender and 
Receiver.
 +//
 +// Endpoints can be created locally or by the remote peer. You must Open() an
 +// endpoint before you can use it. Some endpoints have additional Set*() 
methods
 +// that must be called before Open() to take effect, see Connection, Session,
 +// Link, Sender and Receiver for details.
 +//
 +type Endpoint interface {
 +      // Close an endpoint and signal an error to the remote end if error != 
nil.
 +      Close(error)
 +
 +      // String is a human readable identifier, useful for debugging and 
logging.
 +      String() string
 +
 +      // Error returns nil if the endpoint is open, otherwise returns an 
error.
 +      // Error() == Closed means the endpoint was closed without error.
 +      Error() error
 +
 +      // Connection containing the endpoint
 +      Connection() Connection
 +
 +      // Done returns a channel that will close when the endpoint closes.
 +      // Error() will contain the reason.
 +      Done() <-chan struct{}
 +}
 +
 +// DEVELOPER NOTES
 +//
 +// An electron.Endpoint corresponds to a proton.Endpoint, which can be 
invalidated
 +//
 +type endpoint struct {
 +      err  proton.ErrorHolder
 +      str  string // Must be set by the value that embeds endpoint.
 +      done chan struct{}
 +}
 +
 +func makeEndpoint(s string) endpoint { return endpoint{str: s, done: 
make(chan struct{})} }
 +
 +// Called in handler on a Closed event. Marks the endpoint as closed and the 
corresponding
 +// proton.Endpoint pointer as invalid. Injected functions should check 
Error() to ensure
 +// the pointer has not been invalidated.
 +//
 +// Returns the error stored on the endpoint, which may not be different to 
err if there was
 +// already a n error
 +func (e *endpoint) closed(err error) error {
-       e.err.Set(err)
-       e.err.Set(Closed)
-       close(e.done)
++      select {
++      case <-e.done:
++              // Already closed
++      default:
++              e.err.Set(err)
++              e.err.Set(Closed)
++              close(e.done)
++      }
 +      return e.err.Get()
 +}
 +
 +func (e *endpoint) String() string { return e.str }
 +
 +func (e *endpoint) Error() error { return e.err.Get() }
 +
 +func (e *endpoint) Done() <-chan struct{} { return e.done }
 +
 +// Call in proton goroutine to initiate closing an endpoint locally
 +// handler will complete the close when remote end closes.
 +func localClose(ep proton.Endpoint, err error) {
 +      if ep.State().LocalActive() {
 +              proton.CloseError(ep, err)
 +      }
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/electron/sender.go
----------------------------------------------------------------------
diff --cc electron/sender.go
index 573e9da,0000000..834eb75
mode 100644,000000..100644
--- a/electron/sender.go
+++ b/electron/sender.go
@@@ -1,274 -1,0 +1,273 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +// #include <proton/disposition.h>
 +import "C"
 +
 +import (
++      "fmt"
 +      "qpid.apache.org/amqp"
 +      "qpid.apache.org/proton"
 +      "time"
 +)
 +
 +// Sender is a Link that sends messages.
 +//
 +// The result of sending a message is provided by an Outcome value.
 +//
 +// A sender can buffer messages up to the credit limit provided by the remote 
receiver.
 +// Send* methods will block if the buffer is full until there is space.
 +// Send*Timeout methods will give up after the timeout and set Timeout as 
Outcome.Error.
 +//
 +type Sender interface {
 +      Link
 +
 +      // SendSync sends a message and blocks until the message is 
acknowledged by the remote receiver.
 +      // Returns an Outcome, which may contain an error if the message could 
not be sent.
 +      SendSync(m amqp.Message) Outcome
 +
 +      // SendWaitable puts a message in the send buffer and returns a channel 
that
 +      // you can use to wait for the Outcome of just that message. The 
channel is
 +      // buffered so you can receive from it whenever you want without 
blocking anything.
 +      SendWaitable(m amqp.Message) <-chan Outcome
 +
 +      // SendForget buffers a message for sending and returns, with no 
notification of the outcome.
 +      SendForget(m amqp.Message)
 +
 +      // SendAsync puts a message in the send buffer and returns immediately. 
 An
 +      // Outcome with Value = value will be sent to the ack channel when the 
remote
 +      // receiver has acknowledged the message or if there is an error.
 +      //
 +      // You can use the same ack channel for many calls to SendAsync(), 
possibly on
 +      // many Senders. The channel will receive the outcomes in the order they
 +      // become available. The channel should be buffered and/or served by 
dedicated
 +      // goroutines to avoid blocking the connection.
 +      //
 +      // If ack == nil no Outcome is sent.
 +      SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})
 +
 +      SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, 
timeout time.Duration)
 +
 +      SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan 
Outcome
 +
 +      SendForgetTimeout(m amqp.Message, timeout time.Duration)
 +
 +      SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome
 +}
 +
 +// Outcome provides information about the outcome of sending a message.
 +type Outcome struct {
 +      // Status of the message: was it sent, how was it acknowledged.
 +      Status SentStatus
 +      // Error is a local error if Status is Unsent or Unacknowledged, a 
remote error otherwise.
 +      Error error
 +      // Value provided by the application in SendAsync()
 +      Value interface{}
 +}
 +
 +// SentStatus indicates the status of a sent message.
 +type SentStatus int
 +
 +const (
 +      // Message was never sent
 +      Unsent SentStatus = iota
 +      // Message was sent but never acknowledged. It may or may not have been 
received.
 +      Unacknowledged
-       // Message was sent pre-settled, no remote outcome is available.
-       Presettled
-       // Message was accepted by the receiver
++      // Message was accepted by the receiver (or was sent pre-settled, 
accept is assumed)
 +      Accepted
 +      // Message was rejected as invalid by the receiver
 +      Rejected
 +      // Message was not processed by the receiver but may be valid for a 
different receiver
 +      Released
 +      // Receiver responded with an unrecognized status.
 +      Unknown
 +)
 +
 +// String human readable name for SentStatus.
 +func (s SentStatus) String() string {
 +      switch s {
 +      case Unsent:
 +              return "unsent"
 +      case Unacknowledged:
 +              return "unacknowledged"
 +      case Accepted:
 +              return "accepted"
 +      case Rejected:
 +              return "rejected"
 +      case Released:
 +              return "released"
 +      case Unknown:
 +              return "unknown"
 +      default:
-               return "invalid"
++              return fmt.Sprintf("invalid(%d)", s)
 +      }
 +}
 +
 +// Convert proton delivery state code to SentStatus value
 +func sentStatus(d uint64) SentStatus {
 +      switch d {
 +      case proton.Accepted:
 +              return Accepted
 +      case proton.Rejected:
 +              return Rejected
 +      case proton.Released, proton.Modified:
 +              return Released
 +      default:
 +              return Unknown
 +      }
 +}
 +
 +// Sender implementation, held by handler.
 +type sender struct {
 +      link
 +      credit chan struct{} // Signal available credit.
 +}
 +
 +func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v 
interface{}, t time.Duration) {
 +      // wait for credit
 +      if _, err := timedReceive(s.credit, t); err != nil {
 +              if err == Closed && s.Error != nil {
 +                      err = s.Error()
 +              }
 +              ack <- Outcome{Unsent, err, v}
 +              return
 +      }
 +      // Send a message in handler goroutine
 +      err := s.engine().Inject(func() {
 +              if s.Error() != nil {
 +                      if ack != nil {
 +                              ack <- Outcome{Unsent, s.Error(), v}
 +                      }
 +                      return
 +              }
 +              if delivery, err := s.eLink.Send(m); err == nil {
 +                      if ack != nil { // We must report an outcome
 +                              if s.SndSettle() == SndSettled {
 +                                      delivery.Settle() // Pre-settle if 
required
-                                       ack <- Outcome{Presettled, nil, v}
++                                      ack <- Outcome{Accepted, nil, v}
 +                              } else {
 +                                      s.handler().sentMessages[delivery] = 
sentMessage{ack, v}
 +                              }
 +                      } else { // ack == nil, can't report outcome
 +                              if s.SndSettle() != SndUnsettled { // 
Pre-settle unless we are forced not to.
 +                                      delivery.Settle()
 +                              }
 +                      }
 +              } else { // err != nil
 +                      if ack != nil {
 +                              ack <- Outcome{Unsent, err, v}
 +                      }
 +              }
 +              if s.eLink.Credit() > 0 { // Signal there is still credit
 +                      s.sendable()
 +              }
 +      })
 +      if err != nil && ack != nil {
 +              ack <- Outcome{Unsent, err, v}
 +      }
 +}
 +
 +// Set credit flag if not already set. Non-blocking, any goroutine
 +func (s *sender) sendable() {
 +      select { // Non-blocking
 +      case s.credit <- struct{}{}:
 +      default:
 +      }
 +}
 +
 +func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan 
Outcome {
 +      out := make(chan Outcome, 1)
 +      s.SendAsyncTimeout(m, out, nil, t)
 +      return out
 +}
 +
 +func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) {
 +      s.SendAsyncTimeout(m, nil, nil, t)
 +}
 +
 +func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome {
 +      deadline := time.Now().Add(t)
 +      ack := s.SendWaitableTimeout(m, t)
 +      t = deadline.Sub(time.Now()) // Adjust for time already spent.
 +      if t < 0 {
 +              t = 0
 +      }
 +      if out, err := timedReceive(ack, t); err == nil {
 +              return out.(Outcome)
 +      } else {
 +              if err == Closed && s.Error() != nil {
 +                      err = s.Error()
 +              }
 +              return Outcome{Unacknowledged, err, nil}
 +      }
 +}
 +
 +func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) 
{
 +      s.SendAsyncTimeout(m, ack, v, Forever)
 +}
 +
 +func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome {
 +      return s.SendWaitableTimeout(m, Forever)
 +}
 +
 +func (s *sender) SendForget(m amqp.Message) {
 +      s.SendForgetTimeout(m, Forever)
 +}
 +
 +func (s *sender) SendSync(m amqp.Message) Outcome {
 +      return <-s.SendWaitable(m)
 +}
 +
 +// handler goroutine
 +func (s *sender) closed(err error) {
 +      s.link.closed(err)
 +      close(s.credit)
 +}
 +
 +func newSender(l link) *sender {
 +      s := &sender{link: l, credit: make(chan struct{}, 1)}
 +      s.handler().addLink(s.eLink, s)
 +      s.link.open()
 +      return s
 +}
 +
 +// sentMessage records a sent message on the handler.
 +type sentMessage struct {
 +      ack   chan<- Outcome
 +      value interface{}
 +}
 +
 +// IncomingSender is sent on the Connection.Incoming() channel when there is
 +// an incoming request to open a sender link.
 +type IncomingSender struct {
 +      incomingLink
 +}
 +
 +// Accept accepts an incoming sender endpoint
 +func (in *IncomingSender) Accept() Endpoint {
 +      return in.accept(func() Endpoint { return newSender(in.link) })
 +}
 +
 +// Call in injected functions to check if the sender is valid.
 +func (s *sender) valid() bool {
 +      s2, ok := s.handler().links[s.eLink].(*sender)
 +      return ok && s2 == s
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/proton/engine.go
----------------------------------------------------------------------
diff --cc proton/engine.go
index 2e67ef7,0000000..13d44b8
mode 100644,000000..100644
--- a/proton/engine.go
+++ b/proton/engine.go
@@@ -1,403 -1,0 +1,397 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package proton
 +
 +// #include <proton/connection.h>
 +// #include <proton/event.h>
++// #include <proton/error.h>
 +// #include <proton/handlers.h>
 +// #include <proton/session.h>
 +// #include <proton/transport.h>
 +// #include <memory.h>
 +// #include <stdlib.h>
 +//
 +// PN_HANDLE(REMOTE_ADDR)
 +import "C"
 +
 +import (
 +      "fmt"
 +      "io"
 +      "net"
 +      "sync"
 +      "unsafe"
 +)
 +
 +// Injecter allows functions to be "injected" into the event-processing loop, 
to
 +// be called in the same goroutine as event handlers.
 +type Injecter interface {
 +      // Inject a function into the engine goroutine.
 +      //
 +      // f() will be called in the same goroutine as event handlers, so it 
can safely
 +      // use values belonging to event handlers without synchronization. f() 
should
 +      // not block, no further events or injected functions can be processed 
until
 +      // f() returns.
 +      //
 +      // Returns a non-nil error if the function could not be injected and 
will
 +      // never be called. Otherwise the function will eventually be called.
 +      //
 +      // Note that proton values (Link, Session, Connection etc.) that 
existed when
 +      // Inject(f) was called may have become invalid by the time f() is 
executed.
 +      // Handlers should handle keep track of Closed events to ensure proton 
values
 +      // are not used after they become invalid. One technique is to have map 
from
 +      // proton values to application values. Check that the map has the 
correct
 +      // proton/application value pair at the start of the injected function 
and
 +      // delete the value from the map when handling a Closed event.
 +      Inject(f func()) error
 +
 +      // InjectWait is like Inject but does not return till f() has completed.
 +      // If f() cannot be injected it returns the error from Inject(), 
otherwise
 +      // it returns the error from f()
 +      InjectWait(f func() error) error
 +}
 +
 +// bufferChan manages a pair of ping-pong buffers to pass bytes through a 
channel.
 +type bufferChan struct {
 +      buffers    chan []byte
 +      buf1, buf2 []byte
 +}
 +
 +func newBufferChan(size int) *bufferChan {
 +      return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, 
size)}
 +}
 +
 +func (b *bufferChan) buffer() []byte {
 +      b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers.
 +      return b.buf1[:cap(b.buf1)]
 +}
 +
 +// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
 +// Handler functions sequentially in a single goroutine. Actions taken by
 +// Handler functions (such as sending messages) are encoded and written to the
 +// net.Conn. You can create multiple Engines to handle multiple connections
 +// concurrently.
 +//
 +// You implement the EventHandler and/or MessagingHandler interfaces and 
provide
 +// those values to NewEngine(). Their HandleEvent method will be called in the
 +// event-handling goroutine.
 +//
 +// Handlers can pass values from an event (Connections, Links, Deliveries 
etc.) to
 +// other goroutines, store them, or use them as map indexes. Effectively they 
are
 +// just pointers.  Other goroutines cannot call their methods directly but 
they can
 +// can create a function closure to call such methods and pass it to 
Engine.Inject()
 +// to have it evaluated in the engine goroutine.
 +//
 +// You are responsible for ensuring you don't use an event value after it is
 +// invalid. The handler methods will tell you when a value is no longer 
valid. For
 +// example after a LinkClosed event, that link is no longer valid. If you do
 +// Link.Close() yourself (in a handler or injected function) the link remains 
valid
 +// until the corresponing LinkClosed event is received by the handler.
 +//
 +// Engine.Close() will take care of cleaning up any remaining values when you 
are
 +// done with the Engine. All values associated with a engine become invalid 
when you
 +// call Engine.Close()
 +//
 +// The qpid.apache.org/proton/concurrent package will do all this for you, so 
it
 +// may be a better choice for some applications.
 +//
 +type Engine struct {
 +      // Error is set on exit from Run() if there was an error.
 +      err    ErrorHolder
 +      inject chan func()
 +
 +      conn       net.Conn
 +      connection Connection
 +      transport  Transport
 +      collector  *C.pn_collector_t
 +      read       *bufferChan    // Read buffers channel.
 +      write      *bufferChan    // Write buffers channel.
 +      handlers   []EventHandler // Handlers for proton events.
 +      running    chan struct{}  // This channel will be closed when the 
goroutines are done.
 +      closeOnce  sync.Once
 +}
 +
 +const bufferSize = 4096
 +
 +// NewEngine initializes a engine with a connection and handlers. To start it 
running:
 +//    eng := NewEngine(...)
 +//    go run eng.Run()
 +// The goroutine will exit when the engine is closed or disconnected.
 +// You can check for errors on Engine.Error.
 +//
 +func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
 +      // Save the connection ID for Connection.String()
 +      eng := &Engine{
 +              inject:     make(chan func()),
 +              conn:       conn,
 +              transport:  Transport{C.pn_transport()},
 +              connection: Connection{C.pn_connection()},
 +              collector:  C.pn_collector(),
 +              handlers:   handlers,
 +              read:       newBufferChan(bufferSize),
 +              write:      newBufferChan(bufferSize),
 +              running:    make(chan struct{}),
 +      }
 +      if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == 
nil {
 +              return nil, fmt.Errorf("failed to allocate engine")
 +      }
 +
 +      // TODO aconway 2015-06-25: connection settings for user, password, 
container etc.
 +      // before transport.Bind() Set up connection before Engine, allow 
Engine or Reactor
 +      // to run connection.
 +
 +      // Unique container-id by default.
 +      eng.connection.SetContainer(UUID4().String())
 +      pnErr := eng.transport.Bind(eng.connection)
 +      if pnErr != 0 {
 +              return nil, fmt.Errorf("cannot setup engine: %s", 
PnErrorCode(pnErr))
 +      }
 +      C.pn_connection_collect(eng.connection.pn, eng.collector)
 +      eng.connection.Open()
 +      return eng, nil
 +}
 +
 +func (eng *Engine) String() string {
 +      return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr())
 +}
 +
 +func (eng *Engine) Id() string {
 +      return fmt.Sprintf("%eng", &eng)
 +}
 +
 +func (eng *Engine) Error() error {
 +      return eng.err.Get()
 +}
 +
 +// Inject a function into the Engine's event loop.
 +//
 +// f() will be called in the same event-processing goroutine that calls 
Handler
 +// methods. f() can safely call methods on values that belong to this engine
 +// (Sessions, Links etc)
 +//
 +// The injected function has no parameters or return values. It is normally a
 +// closure and can use channels to communicate with the injecting goroutine if
 +// necessary.
 +//
 +// Returns a non-nil error if the engine is closed before the function could 
be
 +// injected.
 +func (eng *Engine) Inject(f func()) error {
 +      select {
 +      case eng.inject <- f:
 +              return nil
 +      case <-eng.running:
 +              return eng.Error()
 +      }
 +}
 +
 +// InjectWait is like Inject but does not return till f() has completed or the
 +// engine is closed, and returns an error value from f()
 +func (eng *Engine) InjectWait(f func() error) error {
 +      done := make(chan error)
 +      defer close(done)
 +      err := eng.Inject(func() { done <- f() })
 +      if err != nil {
 +              return err
 +      }
 +      select {
 +      case <-eng.running:
 +              return eng.Error()
 +      case err := <-done:
 +              return err
 +      }
 +}
 +
 +// Server puts the Engine in server mode, meaning it will auto-detect 
security settings on
 +// the incoming connnection such as use of SASL and SSL.
 +// Must be called before Run()
 +//
 +func (eng *Engine) Server() { eng.transport.SetServer() }
 +
 +// Close the engine's connection, returns when the engine has exited.
 +func (eng *Engine) Close(err error) {
 +      eng.err.Set(err)
 +      eng.Inject(func() {
 +              CloseError(eng.connection, err)
 +      })
 +      <-eng.running
 +}
 +
 +// Disconnect the engine's connection without and AMQP close, returns when 
the engine has exited.
 +func (eng *Engine) Disconnect(err error) {
 +      eng.err.Set(err)
 +      eng.conn.Close()
 +      <-eng.running
 +}
 +
 +// Run the engine. Engine.Run() will exit when the engine is closed or
 +// disconnected.  You can check for errors after exit with Engine.Error().
 +//
 +func (eng *Engine) Run() error {
 +      wait := sync.WaitGroup{}
 +      wait.Add(2) // Read and write goroutines
 +
 +      readErr := make(chan error, 1) // Don't block
 +      go func() {                    // Read goroutine
 +              defer wait.Done()
 +              for {
 +                      rbuf := eng.read.buffer()
 +                      n, err := eng.conn.Read(rbuf)
 +                      if n > 0 {
 +                              eng.read.buffers <- rbuf[:n]
 +                      }
 +                      if err != nil {
 +                              readErr <- err
 +                              close(readErr)
 +                              close(eng.read.buffers)
 +                              return
 +                      }
 +              }
 +      }()
 +
 +      writeErr := make(chan error, 1) // Don't block
 +      go func() {                     // Write goroutine
 +              defer wait.Done()
 +              for {
 +                      wbuf, ok := <-eng.write.buffers
 +                      if !ok {
 +                              return
 +                      }
 +                      _, err := eng.conn.Write(wbuf)
 +                      if err != nil {
 +                              writeErr <- err
 +                              close(writeErr)
 +                              return
 +                      }
 +              }
 +      }()
 +
 +      wbuf := eng.write.buffer()[:0]
 +
 +      for eng.err.Get() == nil {
 +              if len(wbuf) == 0 {
 +                      eng.pop(&wbuf)
 +              }
 +              // Don't set wchan unless there is something to write.
 +              var wchan chan []byte
 +              if len(wbuf) > 0 {
 +                      wchan = eng.write.buffers
 +              }
 +
 +              select {
 +              case buf, ok := <-eng.read.buffers: // Read a buffer
 +                      if ok {
 +                              eng.push(buf)
 +                      }
 +              case wchan <- wbuf: // Write a buffer
 +                      wbuf = eng.write.buffer()[:0]
 +              case f, ok := <-eng.inject: // Function injected from another 
goroutine
 +                      if ok {
 +                              f()
 +                      }
 +              case err := <-readErr:
 +                      eng.netError(err)
 +              case err := <-writeErr:
 +                      eng.netError(err)
 +              }
 +              eng.process()
 +      }
 +      close(eng.write.buffers)
 +      eng.conn.Close() // Make sure connection is closed
 +      wait.Wait()
 +      close(eng.running) // Signal goroutines have exited and Error is set.
 +
-       // Execute any injected functions for side effects on application data 
structures.
-       inject := eng.inject
-       eng.inject = nil // Further calls to Inject() will return an error.
-       for f := range inject {
-               f()
-       }
- 
 +      if !eng.connection.IsNil() {
 +              eng.connection.Free()
 +      }
 +      if !eng.transport.IsNil() {
 +              eng.transport.Free()
 +      }
 +      if eng.collector != nil {
 +              C.pn_collector_free(eng.collector)
 +      }
 +      for _, h := range eng.handlers {
 +              switch h := h.(type) {
 +              case cHandler:
 +                      C.pn_handler_free(h.pn)
 +              }
 +      }
 +      return eng.err.Get()
 +}
 +
 +func (eng *Engine) netError(err error) {
 +      eng.err.Set(err)
 +      eng.transport.CloseHead()
 +      eng.transport.CloseTail()
 +}
 +
 +func minInt(a, b int) int {
 +      if a < b {
 +              return a
 +      } else {
 +              return b
 +      }
 +}
 +
 +func (eng *Engine) pop(buf *[]byte) {
 +      pending := int(eng.transport.Pending())
 +      switch {
 +      case pending == int(C.PN_EOS):
 +              *buf = (*buf)[:]
 +              return
 +      case pending < 0:
 +              panic(fmt.Errorf("%s", PnErrorCode(pending)))
 +      }
 +      size := minInt(pending, cap(*buf))
 +      *buf = (*buf)[:size]
 +      if size == 0 {
 +              return
 +      }
 +      C.memcpy(unsafe.Pointer(&(*buf)[0]), eng.transport.Head(), 
C.size_t(size))
 +      assert(size > 0)
 +      eng.transport.Pop(uint(size))
 +}
 +
 +func (eng *Engine) push(buf []byte) {
 +      buf2 := buf
 +      for len(buf2) > 0 {
 +              n := eng.transport.Push(buf2)
 +              if n <= 0 {
 +                      panic(fmt.Errorf("error in transport: %s", 
PnErrorCode(n)))
 +              }
 +              buf2 = buf2[n:]
 +      }
 +}
 +
 +func (eng *Engine) handle(e Event) {
 +      for _, h := range eng.handlers {
 +              h.HandleEvent(e)
 +      }
 +      if e.Type() == ETransportClosed {
 +              eng.err.Set(io.EOF)
 +      }
 +}
 +
 +func (eng *Engine) process() {
 +      for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = 
C.pn_collector_peek(eng.collector) {
 +              eng.handle(makeEvent(ce, eng))
 +              C.pn_collector_pop(eng.collector)
 +      }
 +}
 +
 +func (eng *Engine) Connection() Connection { return eng.connection }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to