Merge branch 'master' into go1 - updated to work with proton C 0.10
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c87499a3 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c87499a3 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c87499a3 Branch: refs/heads/go1 Commit: c87499a3bcd44c6793c9b5eaa381144a72e06e23 Parents: dfb5b06 bf7e193 Author: Alan Conway <[email protected]> Authored: Wed Dec 30 16:07:26 2015 -0500 Committer: Alan Conway <[email protected]> Committed: Wed Dec 30 16:07:26 2015 -0500 ---------------------------------------------------------------------- amqp/codec_shim.h | 33 +++++++++++++++++++++++++++++++++ amqp/marshal.go | 2 +- amqp/message.go | 2 +- amqp/types.go | 11 +++-------- amqp/unmarshal.go | 6 +++--- electron/connection.go | 2 ++ electron/endpoint.go | 11 ++++++++--- electron/sender.go | 9 ++++----- proton/engine.go | 8 +------- proton/wrappers.go | 5 ++--- proton/wrappers_gen.go | 2 ++ 11 files changed, 60 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/amqp/codec_shim.h ---------------------------------------------------------------------- diff --cc amqp/codec_shim.h index 0000000,0000000..b2f9f1c new file mode 100644 --- /dev/null +++ b/amqp/codec_shim.h @@@ -1,0 -1,0 +1,33 @@@ ++#ifndef CODEC_SHIM_H ++#define CODEC_SHIM_H ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++/** Stubs to allow the go binding to work with multiple versions of proton. */ ++ ++#include <proton/codec.h> ++#include <proton/version.h> ++ ++#if PN_VERSION_MAJOR == 0 && PN_VERSION_MINOR <= 10 ++ ++#define PN_INVALID ((pn_type_t)-1) ++ ++#endif ++ ++#endif // CODEC_SHIM_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/amqp/marshal.go ---------------------------------------------------------------------- diff --cc amqp/marshal.go index 9930e13,0000000..66e14d8 mode 100644,000000..100644 --- a/amqp/marshal.go +++ b/amqp/marshal.go @@@ -1,250 -1,0 +1,250 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + - // #include <proton/codec.h> ++//#include "codec_shim.h" +import "C" + +import ( + "fmt" + "io" + "reflect" + "unsafe" +) + +func dataError(prefix string, data *C.pn_data_t) error { + err := PnError(C.pn_data_error(data)) + if err != nil { + err = fmt.Errorf("%s: %s", prefix, err.Error()) + } + return err +} + +/* +Marshal encodes a Go value as AMQP data in buffer. +If buffer is nil, or is not large enough, a new buffer is created. + +Returns the buffer used for encoding with len() adjusted to the actual size of data. + +Go types are encoded as follows + + +-------------------------------------+--------------------------------------------+ + |Go type |AMQP type | + +-------------------------------------+--------------------------------------------+ + |bool |bool | + +-------------------------------------+--------------------------------------------+ + |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) | + +-------------------------------------+--------------------------------------------+ + |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) | + +-------------------------------------+--------------------------------------------+ + |float32, float64 |float, double. | + +-------------------------------------+--------------------------------------------+ + |string |string | + +-------------------------------------+--------------------------------------------+ + |[]byte, Binary |binary | + +-------------------------------------+--------------------------------------------+ + |Symbol |symbol | + +-------------------------------------+--------------------------------------------+ + |interface{} |the contained type | + +-------------------------------------+--------------------------------------------+ + |nil |null | + +-------------------------------------+--------------------------------------------+ + |map[K]T |map with K and T converted as above | + +-------------------------------------+--------------------------------------------+ + |Map |map, may have mixed types for keys, values | + +-------------------------------------+--------------------------------------------+ + |[]T |list with T converted as above | + +-------------------------------------+--------------------------------------------+ + |List |list, may have mixed types values | + +-------------------------------------+--------------------------------------------+ + +The following Go types cannot be marshaled: uintptr, function, interface, channel + +TODO + +Go types: array, slice, struct, complex64/128. + +AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies. + +Described types. + +*/ +func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { + defer doRecover(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) + marshal(v, data) + encode := func(buf []byte) ([]byte, error) { + n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf))) + switch { + case n == int(C.PN_OVERFLOW): + return buf, overflow + case n < 0: + return buf, dataError("marshal error", data) + default: + return buf[:n], nil + } + } + return encodeGrow(buffer, encode) +} + +const minEncode = 256 + +// overflow is returned when an encoding function can't fit data in the buffer. +var overflow = fmt.Errorf("buffer too small") + +// encodeFn encodes into buffer[0:len(buffer)]. +// Returns buffer with length adjusted for data encoded. +// If buffer too small, returns overflow as error. +type encodeFn func(buffer []byte) ([]byte, error) + +// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer. +// Returns the final buffer. +func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) { + if buffer == nil || len(buffer) == 0 { + buffer = make([]byte, minEncode) + } + var err error + for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) { + buffer = make([]byte, 2*len(buffer)) + } + return buffer, err +} + +func marshal(v interface{}, data *C.pn_data_t) { + switch v := v.(type) { + case nil: + C.pn_data_put_null(data) + case bool: + C.pn_data_put_bool(data, C.bool(v)) + case int8: + C.pn_data_put_byte(data, C.int8_t(v)) + case int16: + C.pn_data_put_short(data, C.int16_t(v)) + case int32: + C.pn_data_put_int(data, C.int32_t(v)) + case int64: + C.pn_data_put_long(data, C.int64_t(v)) + case int: + if unsafe.Sizeof(0) == 8 { + C.pn_data_put_long(data, C.int64_t(v)) + } else { + C.pn_data_put_int(data, C.int32_t(v)) + } + case uint8: + C.pn_data_put_ubyte(data, C.uint8_t(v)) + case uint16: + C.pn_data_put_ushort(data, C.uint16_t(v)) + case uint32: + C.pn_data_put_uint(data, C.uint32_t(v)) + case uint64: + C.pn_data_put_ulong(data, C.uint64_t(v)) + case uint: + if unsafe.Sizeof(0) == 8 { + C.pn_data_put_ulong(data, C.uint64_t(v)) + } else { + C.pn_data_put_uint(data, C.uint32_t(v)) + } + case float32: + C.pn_data_put_float(data, C.float(v)) + case float64: + C.pn_data_put_double(data, C.double(v)) + case string: + C.pn_data_put_string(data, pnBytes([]byte(v))) + case []byte: + C.pn_data_put_binary(data, pnBytes(v)) + case Binary: + C.pn_data_put_binary(data, pnBytes([]byte(v))) + case Symbol: + C.pn_data_put_symbol(data, pnBytes([]byte(v))) + case Map: // Special map type + C.pn_data_put_map(data) + C.pn_data_enter(data) + for key, val := range v { + marshal(key, data) + marshal(val, data) + } + C.pn_data_exit(data) + default: + switch reflect.TypeOf(v).Kind() { + case reflect.Map: + putMap(data, v) + case reflect.Slice: + putList(data, v) + default: + panic(fmt.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) + } + } + err := dataError("marshal", data) + if err != nil { + panic(err) + } + return +} + +func clearMarshal(v interface{}, data *C.pn_data_t) { + C.pn_data_clear(data) + marshal(v, data) +} + +func putMap(data *C.pn_data_t, v interface{}) { + mapValue := reflect.ValueOf(v) + C.pn_data_put_map(data) + C.pn_data_enter(data) + for _, key := range mapValue.MapKeys() { + marshal(key.Interface(), data) + marshal(mapValue.MapIndex(key).Interface(), data) + } + C.pn_data_exit(data) +} + +func putList(data *C.pn_data_t, v interface{}) { + listValue := reflect.ValueOf(v) + C.pn_data_put_list(data) + C.pn_data_enter(data) + for i := 0; i < listValue.Len(); i++ { + marshal(listValue.Index(i).Interface(), data) + } + C.pn_data_exit(data) +} + +// Encoder encodes AMQP values to an io.Writer +type Encoder struct { + writer io.Writer + buffer []byte +} + +// New encoder returns a new encoder that writes to w. +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w, make([]byte, minEncode)} +} + +func (e *Encoder) Encode(v interface{}) (err error) { + e.buffer, err = Marshal(v, e.buffer) + if err == nil { + e.writer.Write(e.buffer) + } + return err +} + +func replace(data *C.pn_data_t, v interface{}) { + C.pn_data_clear(data) + marshal(v, data) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/amqp/message.go ---------------------------------------------------------------------- diff --cc amqp/message.go index e36c6f2,0000000..1d1287f mode 100644,000000..100644 --- a/amqp/message.go +++ b/amqp/message.go @@@ -1,346 -1,0 +1,346 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + ++// #include "codec_shim.h" +// #include <proton/types.h> +// #include <proton/message.h> - // #include <proton/codec.h> +// #include <stdlib.h> +// +// /* Helper for setting message string fields */ +// typedef int (*set_fn)(pn_message_t*, const char*); +// int msg_set_str(pn_message_t* m, char* s, set_fn set) { +// int result = set(m, s); +// free(s); +// return result; +// } +// +import "C" + +import ( + "fmt" + "runtime" + "time" + "unsafe" +) + +// Message is the interface to an AMQP message. +type Message interface { + // Durable indicates that any parties taking responsibility + // for the message must durably store the content. + Durable() bool + SetDurable(bool) + + // Priority impacts ordering guarantees. Within a + // given ordered context, higher priority messages may jump ahead of + // lower priority messages. + Priority() uint8 + SetPriority(uint8) + + // TTL or Time To Live, a message it may be dropped after this duration + TTL() time.Duration + SetTTL(time.Duration) + + // FirstAcquirer indicates + // that the recipient of the message is the first recipient to acquire + // the message, i.e. there have been no failed delivery attempts to + // other acquirers. Note that this does not mean the message has not + // been delivered to, but not acquired, by other recipients. + FirstAcquirer() bool + SetFirstAcquirer(bool) + + // DeliveryCount tracks how many attempts have been made to + // delivery a message. + DeliveryCount() uint32 + SetDeliveryCount(uint32) + + // MessageId provides a unique identifier for a message. + // it can be an a string, an unsigned long, a uuid or a + // binary value. + MessageId() interface{} + SetMessageId(interface{}) + + UserId() string + SetUserId(string) + + Address() string + SetAddress(string) + + Subject() string + SetSubject(string) + + ReplyTo() string + SetReplyTo(string) + + // CorrelationId is set on correlated request and response messages. It can be + // an a string, an unsigned long, a uuid or a binary value. + CorrelationId() interface{} + SetCorrelationId(interface{}) + + ContentType() string + SetContentType(string) + + ContentEncoding() string + SetContentEncoding(string) + + // ExpiryTime indicates an absoulte time when the message may be dropped. + // A Zero time (i.e. t.isZero() == true) indicates a message never expires. + ExpiryTime() time.Time + SetExpiryTime(time.Time) + + CreationTime() time.Time + SetCreationTime(time.Time) + + GroupId() string + SetGroupId(string) + + GroupSequence() int32 + SetGroupSequence(int32) + + ReplyToGroupId() string + SetReplyToGroupId(string) + + // Instructions - AMQP delivery instructions. + Instructions() map[string]interface{} + SetInstructions(v map[string]interface{}) + + // Annotations - AMQP annotations. + Annotations() map[string]interface{} + SetAnnotations(v map[string]interface{}) + + // Properties - Application properties. + Properties() map[string]interface{} + SetProperties(v map[string]interface{}) + + // Inferred indicates how the message content + // is encoded into AMQP sections. If inferred is true then binary and + // list values in the body of the message will be encoded as AMQP DATA + // and AMQP SEQUENCE sections, respectively. If inferred is false, + // then all values in the body of the message will be encoded as AMQP + // VALUE sections regardless of their type. + Inferred() bool + SetInferred(bool) + + // Marshal a Go value into the message body. See amqp.Marshal() for details. + Marshal(interface{}) + + // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details. + Unmarshal(interface{}) + + // Body value resulting from the default unmarshalling of message body as interface{} + Body() interface{} + + // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough + // the message is encoded into it, otherwise a new buffer is created. + // Returns the buffer containing the message. + Encode(buffer []byte) ([]byte, error) + + // Decode data into this message. Overwrites an existing message content. + Decode(buffer []byte) error + + // Clear the message contents. + Clear() + + // Copy the contents of another message to this one. + Copy(m Message) error +} + +type message struct{ pn *C.pn_message_t } + +func freeMessage(m *message) { + C.pn_message_free(m.pn) + m.pn = nil +} + +// NewMessage creates a new message instance. +func NewMessage() Message { + m := &message{C.pn_message()} + runtime.SetFinalizer(m, freeMessage) + return m +} + +// NewMessageWith creates a message with value as the body. Equivalent to +// m := NewMessage(); m.Marshal(body) +func NewMessageWith(value interface{}) Message { + m := NewMessage() + m.Marshal(value) + return m +} + +func (m *message) Clear() { C.pn_message_clear(m.pn) } + +func (m *message) Copy(x Message) error { + if data, err := x.Encode(nil); err == nil { + return m.Decode(data) + } else { + return err + } +} + +// ==== message get functions + +func rewindGet(data *C.pn_data_t) (v interface{}) { + C.pn_data_rewind(data) + C.pn_data_next(data) + unmarshal(&v, data) + return v +} + +func rewindMap(data *C.pn_data_t) (v map[string]interface{}) { + C.pn_data_rewind(data) + C.pn_data_next(data) + unmarshal(&v, data) + return v +} + +func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) } +func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) } +func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) } +func (m *message) TTL() time.Duration { + return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond +} +func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) } +func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) } +func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) } +func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) } +func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) } +func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) } +func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) } +func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) } +func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) } +func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) } + +func (m *message) ExpiryTime() time.Time { + return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn)))) +} +func (m *message) CreationTime() time.Time { + return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn))) +} +func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) } +func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) } +func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) } + +func (m *message) Instructions() map[string]interface{} { + return rewindMap(C.pn_message_instructions(m.pn)) +} +func (m *message) Annotations() map[string]interface{} { + return rewindMap(C.pn_message_annotations(m.pn)) +} +func (m *message) Properties() map[string]interface{} { + return rewindMap(C.pn_message_properties(m.pn)) +} + +// ==== message set methods + +func setData(v interface{}, data *C.pn_data_t) { + C.pn_data_clear(data) + marshal(v, data) +} + +func dataString(data *C.pn_data_t) string { + str := C.pn_string(C.CString("")) + defer C.pn_free(unsafe.Pointer(str)) + C.pn_inspect(unsafe.Pointer(data), str) + return C.GoString(C.pn_string_get(str)) +} + +func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(m.Inferred())) } +func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) } +func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) } +func (m *message) SetTTL(d time.Duration) { + C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond)) +} +func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) } +func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) } +func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) } +func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) } +func (m *message) SetAddress(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address)) +} +func (m *message) SetSubject(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject)) +} +func (m *message) SetReplyTo(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to)) +} +func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) } +func (m *message) SetContentType(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type)) +} +func (m *message) SetContentEncoding(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding)) +} +func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) } +func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) } +func (m *message) SetGroupId(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id)) +} +func (m *message) SetGroupSequence(s int32) { + C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s)) +} +func (m *message) SetReplyToGroupId(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id)) +} + +func (m *message) SetInstructions(v map[string]interface{}) { + setData(v, C.pn_message_instructions(m.pn)) +} +func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) } +func (m *message) SetProperties(v map[string]interface{}) { setData(v, C.pn_message_properties(m.pn)) } + +// Marshal/Unmarshal body +func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) } +func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) } +func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return } + +func (m *message) Decode(data []byte) error { + m.Clear() + if len(data) == 0 { + return fmt.Errorf("empty buffer for decode") + } + if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 { + return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn))) + } + return nil +} + +func DecodeMessage(data []byte) (m Message, err error) { + m = NewMessage() + err = m.Decode(data) + return +} + +func (m *message) Encode(buffer []byte) ([]byte, error) { + encode := func(buf []byte) ([]byte, error) { + len := cLen(buf) + result := C.pn_message_encode(m.pn, cPtr(buf), &len) + switch { + case result == C.PN_OVERFLOW: + return buf, overflow + case result < 0: + return buf, fmt.Errorf("cannot encode message: %s", PnErrorCode(result)) + default: + return buf[:len], nil + } + } + return encodeGrow(buffer, encode) +} + +// TODO aconway 2015-09-14: Multi-section messages. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/amqp/types.go ---------------------------------------------------------------------- diff --cc amqp/types.go index 697d896,0000000..d927cc5 mode 100644,000000..100644 --- a/amqp/types.go +++ b/amqp/types.go @@@ -1,201 -1,0 +1,196 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + - // #include <proton/codec.h> ++//#include "codec_shim.h" +import "C" + +import ( + "bytes" + "fmt" + "reflect" + "time" + "unsafe" +) + - // Older proton versions don't define C.PN_INVALID, so define it here. - // In C it is pn_type_t(-1), in Go use the bitwise NOT operator to get the same value. - const pnInvalid = C.pn_type_t(^0) - +func (t C.pn_type_t) String() string { + switch C.pn_type_t(t) { + case C.PN_NULL: + return "null" + case C.PN_BOOL: + return "bool" + case C.PN_UBYTE: + return "ubyte" + case C.PN_BYTE: + return "byte" + case C.PN_USHORT: + return "ushort" + case C.PN_SHORT: + return "short" + case C.PN_CHAR: + return "char" + case C.PN_UINT: + return "uint" + case C.PN_INT: + return "int" + case C.PN_ULONG: + return "ulong" + case C.PN_LONG: + return "long" + case C.PN_TIMESTAMP: + return "timestamp" + case C.PN_FLOAT: + return "float" + case C.PN_DOUBLE: + return "double" + case C.PN_DECIMAL32: + return "decimal32" + case C.PN_DECIMAL64: + return "decimal64" + case C.PN_DECIMAL128: + return "decimal128" + case C.PN_UUID: + return "uuid" + case C.PN_BINARY: + return "binary" + case C.PN_STRING: + return "string" + case C.PN_SYMBOL: + return "symbol" + case C.PN_DESCRIBED: + return "described" + case C.PN_ARRAY: + return "array" + case C.PN_LIST: + return "list" + case C.PN_MAP: + return "map" ++ case C.PN_INVALID: ++ return "no-data" + default: - if t == pnInvalid { - return "no-data" - } + return fmt.Sprintf("unknown-type(%d)", t) + } +} + +// Go types +var ( + bytesType = reflect.TypeOf([]byte{}) + valueType = reflect.TypeOf(reflect.Value{}) +) + +// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys. + +// Map is a generic map that can have mixed key and value types and so can represent any AMQP map +type Map map[interface{}]interface{} + +// List is a generic list that can hold mixed values and can represent any AMQP list. +// +type List []interface{} + +// Symbol is a string that is encoded as an AMQP symbol +type Symbol string + +func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) } + +// Binary is a string that is encoded as an AMQP binary. +// It is a string rather than a byte[] because byte[] is not hashable and can't be used as +// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte +type Binary string + +func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) } + +// GoString for Map prints values with their types, useful for debugging. +func (m Map) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", m) + i := len(m) + for k, v := range m { + fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v) + i-- + if i > 0 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// GoString for List prints values with their types, useful for debugging. +func (l List) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", l) + for i := 0; i < len(l); i++ { + fmt.Fprintf(out, "%T(%#v)", l[i], l[i]) + if i == len(l)-1 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// pnTime converts Go time.Time to Proton millisecond Unix time. +func pnTime(t time.Time) C.pn_timestamp_t { + secs := t.Unix() + // Note: sub-second accuracy is not guaraunteed if the Unix time in + // nanoseconds cannot be represented by an int64 (sometime around year 2260) + msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) + return C.pn_timestamp_t(secs*1000 + msecs) +} + +// goTime converts a pn_timestamp_t to a Go time.Time. +func goTime(t C.pn_timestamp_t) time.Time { + secs := int64(t) / 1000 + nsecs := (int64(t) % 1000) * int64(time.Millisecond) + return time.Unix(secs, nsecs) +} + +func goBytes(cBytes C.pn_bytes_t) (bytes []byte) { + if cBytes.start != nil { + bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size)) + } + return +} + +func goString(cBytes C.pn_bytes_t) (str string) { + if cBytes.start != nil { + str = C.GoStringN(cBytes.start, C.int(cBytes.size)) + } + return +} + +func pnBytes(b []byte) C.pn_bytes_t { + if len(b) == 0 { + return C.pn_bytes_t{0, nil} + } else { + return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))} + } +} + +func cPtr(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} + +func cLen(b []byte) C.size_t { + return C.size_t(len(b)) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/amqp/unmarshal.go ---------------------------------------------------------------------- diff --cc amqp/unmarshal.go index 05ecb8d,0000000..6942174 mode 100644,000000..100644 --- a/amqp/unmarshal.go +++ b/amqp/unmarshal.go @@@ -1,561 -1,0 +1,561 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +oor more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + - // #include <proton/codec.h> ++// #include "codec_shim.h" +import "C" + +import ( + "bytes" + "fmt" + "io" + "reflect" + "unsafe" +) + +const minDecode = 1024 + +// Error returned if AMQP data cannot be unmarshaled as the desired Go type. +type UnmarshalError struct { + // The name of the AMQP type. + AMQPType string + // The Go type. + GoType reflect.Type +} + +func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError { + return &UnmarshalError{C.pn_type_t(pnType).String(), reflect.TypeOf(v)} +} + +func (e UnmarshalError) Error() string { + if e.GoType.Kind() != reflect.Ptr { + return fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType) + } else { + return fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) + } +} + +func doRecover(err *error) { + r := recover() + switch r := r.(type) { + case nil: + case *UnmarshalError: + *err = r + default: + panic(r) + } +} + +// +// Decoding from a pn_data_t +// +// NOTE: we use panic() to signal a decoding error, simplifies decoding logic. +// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode. +// + +// Decoder decodes AMQP values from an io.Reader. +// +type Decoder struct { + reader io.Reader + buffer bytes.Buffer +} + +// NewDecoder returns a new decoder that reads from r. +// +// The decoder has it's own buffer and may read more data than required for the +// AMQP values requested. Use Buffered to see if there is data left in the +// buffer. +// +func NewDecoder(r io.Reader) *Decoder { + return &Decoder{r, bytes.Buffer{}} +} + +// Buffered returns a reader of the data remaining in the Decoder's buffer. The +// reader is valid until the next call to Decode. +// +func (d *Decoder) Buffered() io.Reader { + return bytes.NewReader(d.buffer.Bytes()) +} + +// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v. +// +// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value. +// +func (d *Decoder) Decode(v interface{}) (err error) { + defer doRecover(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) + var n int + for n == 0 { + n, err = decode(data, d.buffer.Bytes()) + if err != nil { + return err + } + if n == 0 { // n == 0 means not enough data, read more + err = d.more() + } else { + unmarshal(v, data) + } + } + d.buffer.Next(n) + return +} + +/* +Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v. +Types are converted as follows: + + +---------------------------+----------------------------------------------------------------------+ + |To Go types |From AMQP types | + +===========================+======================================================================+ + |bool |bool | + +---------------------------+----------------------------------------------------------------------+ + |int, int8, int16, |Equivalent or smaller signed integer type: byte, short, int, long. | + |int32, int64 | | + +---------------------------+----------------------------------------------------------------------+ + |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: ubyte, ushort, uint, | + |uint32, uint64 types |ulong | + +---------------------------+----------------------------------------------------------------------+ + |float32, float64 |Equivalent or smaller float or double. | + +---------------------------+----------------------------------------------------------------------+ + |string, []byte |string, symbol or binary. | + +---------------------------+----------------------------------------------------------------------+ + |Symbol |symbol | + +---------------------------+----------------------------------------------------------------------+ + |map[K]T |map, provided all keys and values can unmarshal to types K, T | + +---------------------------+----------------------------------------------------------------------+ + |Map |map, any AMQP map | + +---------------------------+----------------------------------------------------------------------+ + |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: | + | +------------------------+---------------------------------------------+ + | |AMQP Type |Go Type in interface{} | + | +========================+=============================================+ + | |bool |bool | + | +------------------------+---------------------------------------------+ + | |byte,short,int,long |int8,int16,int32,int64 | + | +------------------------+---------------------------------------------+ + | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 | + | +------------------------+---------------------------------------------+ + | |float, double |float32, float64 | + | +------------------------+---------------------------------------------+ + | |string |string | + | +------------------------+---------------------------------------------+ + | |symbol |Symbol | + | +------------------------+---------------------------------------------+ + | |binary |Binary | + | +------------------------+---------------------------------------------+ + | |nulll |nil | + | +------------------------+---------------------------------------------+ + | |map |Map | + | +------------------------+---------------------------------------------+ + | |list |List | + +---------------------------+------------------------+---------------------------------------------+ + +The following Go types cannot be unmarshaled: uintptr, function, interface, channel. + +TODO + +Go types: array, struct. + +AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies. + +AMQP maps with mixed/unhashable key types need an alternate representation. + +Described types. +*/ +func Unmarshal(bytes []byte, v interface{}) (n int, err error) { + defer doRecover(&err) + + data := C.pn_data(0) + defer C.pn_data_free(data) + n, err = decode(data, bytes) + if err != nil { + return 0, err + } + if n == 0 { + return 0, fmt.Errorf("not enough data") + } else { + unmarshal(v, data) + } + return n, nil +} + +// more reads more data when we can't parse a complete AMQP type +func (d *Decoder) more() error { + var readSize int64 = minDecode + if int64(d.buffer.Len()) > readSize { // Grow by doubling + readSize = int64(d.buffer.Len()) + } + var n int64 + n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize)) + if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0 + err = io.EOF + } + return err +} + +// Unmarshal from data into value pointed at by v. +func unmarshal(v interface{}, data *C.pn_data_t) { + pnType := C.pn_data_type(data) + switch v := v.(type) { + case *bool: + switch pnType { + case C.PN_BOOL: + *v = bool(C.pn_data_get_bool(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int8: + switch pnType { + case C.PN_CHAR: + *v = int8(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int8(C.pn_data_get_byte(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint8: + switch pnType { + case C.PN_CHAR: + *v = uint8(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint8(C.pn_data_get_ubyte(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int16: + switch pnType { + case C.PN_CHAR: + *v = int16(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int16(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int16(C.pn_data_get_short(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint16: + switch pnType { + case C.PN_CHAR: + *v = uint16(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint16(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint16(C.pn_data_get_ushort(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int32: + switch pnType { + case C.PN_CHAR: + *v = int32(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int32(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int32(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int32(C.pn_data_get_int(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint32: + switch pnType { + case C.PN_CHAR: + *v = uint32(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint32(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint32(C.pn_data_get_ushort(data)) + case C.PN_UINT: + *v = uint32(C.pn_data_get_uint(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *int64: + switch pnType { + case C.PN_CHAR: + *v = int64(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int64(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int64(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int64(C.pn_data_get_int(data)) + case C.PN_LONG: + *v = int64(C.pn_data_get_long(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *uint64: + switch pnType { + case C.PN_CHAR: + *v = uint64(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint64(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint64(C.pn_data_get_ushort(data)) + case C.PN_ULONG: + *v = uint64(C.pn_data_get_ulong(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *int: + switch pnType { + case C.PN_CHAR: + *v = int(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int(C.pn_data_get_int(data)) + case C.PN_LONG: + if unsafe.Sizeof(0) == 8 { + *v = int(C.pn_data_get_long(data)) + } else { + panic(newUnmarshalError(pnType, v)) + } + default: + panic(newUnmarshalError(pnType, v)) + } + + case *uint: + switch pnType { + case C.PN_CHAR: + *v = uint(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint(C.pn_data_get_ushort(data)) + case C.PN_UINT: + *v = uint(C.pn_data_get_uint(data)) + case C.PN_ULONG: + if unsafe.Sizeof(0) == 8 { + *v = uint(C.pn_data_get_ulong(data)) + } else { + panic(newUnmarshalError(pnType, v)) + } + default: + panic(newUnmarshalError(pnType, v)) + } + + case *float32: + switch pnType { + case C.PN_FLOAT: + *v = float32(C.pn_data_get_float(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *float64: + switch pnType { + case C.PN_FLOAT: + *v = float64(C.pn_data_get_float(data)) + case C.PN_DOUBLE: + *v = float64(C.pn_data_get_double(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *string: + switch pnType { + case C.PN_STRING: + *v = goString(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = goString(C.pn_data_get_symbol(data)) + case C.PN_BINARY: + *v = goString(C.pn_data_get_binary(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *[]byte: + switch pnType { + case C.PN_STRING: + *v = goBytes(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = goBytes(C.pn_data_get_symbol(data)) + case C.PN_BINARY: + *v = goBytes(C.pn_data_get_binary(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *Binary: + switch pnType { + case C.PN_BINARY: + *v = Binary(goBytes(C.pn_data_get_binary(data))) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *Symbol: + switch pnType { + case C.PN_SYMBOL: + *v = Symbol(goBytes(C.pn_data_get_symbol(data))) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *interface{}: + getInterface(data, v) + + default: + if reflect.TypeOf(v).Kind() != reflect.Ptr { + panic(newUnmarshalError(pnType, v)) + } + switch reflect.TypeOf(v).Elem().Kind() { + case reflect.Map: + getMap(data, v) + case reflect.Slice: + getList(data, v) + default: + panic(newUnmarshalError(pnType, v)) + } + } + err := dataError("unmarshaling", data) + if err != nil { + panic(err) + } + return +} + +func rewindUnmarshal(v interface{}, data *C.pn_data_t) { + C.pn_data_rewind(data) + C.pn_data_next(data) + unmarshal(v, data) +} + +// Getting into an interface is driven completely by the AMQP type, since the interface{} +// target is type-neutral. +func getInterface(data *C.pn_data_t, v *interface{}) { + pnType := C.pn_data_type(data) + switch pnType { - case C.PN_NULL, pnInvalid: // No data. ++ case C.PN_NULL, C.PN_INVALID: // No data. + *v = nil + case C.PN_BOOL: + *v = bool(C.pn_data_get_bool(data)) + case C.PN_UBYTE: + *v = uint8(C.pn_data_get_ubyte(data)) + case C.PN_BYTE: + *v = int8(C.pn_data_get_byte(data)) + case C.PN_USHORT: + *v = uint16(C.pn_data_get_ushort(data)) + case C.PN_SHORT: + *v = int16(C.pn_data_get_short(data)) + case C.PN_UINT: + *v = uint32(C.pn_data_get_uint(data)) + case C.PN_INT: + *v = int32(C.pn_data_get_int(data)) + case C.PN_CHAR: + *v = uint8(C.pn_data_get_char(data)) + case C.PN_ULONG: + *v = uint64(C.pn_data_get_ulong(data)) + case C.PN_LONG: + *v = int64(C.pn_data_get_long(data)) + case C.PN_FLOAT: + *v = float32(C.pn_data_get_float(data)) + case C.PN_DOUBLE: + *v = float64(C.pn_data_get_double(data)) + case C.PN_BINARY: + *v = Binary(goBytes(C.pn_data_get_binary(data))) + case C.PN_STRING: + *v = goString(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = Symbol(goString(C.pn_data_get_symbol(data))) + case C.PN_MAP: + m := make(Map) + unmarshal(&m, data) + *v = m + case C.PN_LIST: + l := make(List, 0) + unmarshal(&l, data) + *v = l + default: + panic(newUnmarshalError(pnType, v)) + } +} + +// get into map pointed at by v +func getMap(data *C.pn_data_t, v interface{}) { + mapValue := reflect.ValueOf(v).Elem() + mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map + switch pnType := C.pn_data_type(data); pnType { + case C.PN_MAP: + count := int(C.pn_data_get_map(data)) + if bool(C.pn_data_enter(data)) { + defer C.pn_data_exit(data) + for i := 0; i < count/2; i++ { + if bool(C.pn_data_next(data)) { + key := reflect.New(mapValue.Type().Key()) + unmarshal(key.Interface(), data) + if bool(C.pn_data_next(data)) { + val := reflect.New(mapValue.Type().Elem()) + unmarshal(val.Interface(), data) + mapValue.SetMapIndex(key.Elem(), val.Elem()) + } + } + } + } - case pnInvalid: // Leave the map empty ++ case C.PN_INVALID: // Leave the map empty + default: + panic(newUnmarshalError(pnType, v)) + } +} + +func getList(data *C.pn_data_t, v interface{}) { + pnType := C.pn_data_type(data) + if pnType != C.PN_LIST { + panic(newUnmarshalError(pnType, v)) + } + count := int(C.pn_data_get_list(data)) + listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count) + if bool(C.pn_data_enter(data)) { + for i := 0; i < count; i++ { + if bool(C.pn_data_next(data)) { + val := reflect.New(listValue.Type().Elem()) + unmarshal(val.Interface(), data) + listValue.Index(i).Set(val.Elem()) + } + } + C.pn_data_exit(data) + } + reflect.ValueOf(v).Elem().Set(listValue) +} + +// decode from bytes. +// Return bytes decoded or 0 if we could not decode a complete object. +// +func decode(data *C.pn_data_t, bytes []byte) (int, error) { + if len(bytes) == 0 { + return 0, nil + } + n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes))) + if n == int(C.PN_UNDERFLOW) { + C.pn_error_clear(C.pn_data_error(data)) + return 0, nil + } else if n <= 0 { + return 0, fmt.Errorf("unmarshal %s", PnErrorCode(n)) + } + return n, nil +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/electron/connection.go ---------------------------------------------------------------------- diff --cc electron/connection.go index 8a9e6cd,0000000..386875d mode 100644,000000..100644 --- a/electron/connection.go +++ b/electron/connection.go @@@ -1,238 -1,0 +1,240 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +// #include <proton/disposition.h> +import "C" + +import ( ++ "fmt" + "net" + "qpid.apache.org/proton" + "sync" + "time" +) + +// Connection is an AMQP connection, created by a Container. +type Connection interface { + Endpoint + + // Sender opens a new sender on the DefaultSession. + Sender(...LinkOption) (Sender, error) + + // Receiver opens a new Receiver on the DefaultSession(). + Receiver(...LinkOption) (Receiver, error) + + // DefaultSession() returns a default session for the connection. It is opened + // on the first call to DefaultSession and returned on subsequent calls. + DefaultSession() (Session, error) + + // Session opens a new session. + Session(...SessionOption) (Session, error) + + // Container for the connection. + Container() Container + + // Disconnect the connection abruptly with an error. + Disconnect(error) + + // Wait waits for the connection to be disconnected. + Wait() error + + // WaitTimeout is like Wait but returns Timeout if the timeout expires. + WaitTimeout(time.Duration) error + + // Incoming returns a channel for incoming endpoints opened by the remote end. + // + // To enable, pass AllowIncoming() when creating the Connection. Otherwise all + // incoming endpoint requests are automatically rejected and Incoming() + // returns nil. + // + // An Incoming value can be an *IncomingSession, *IncomingSender or + // *IncomingReceiver. You must call Accept() to open the endpoint or Reject() + // to close it with an error. The specific Incoming types have additional + // methods to configure the endpoint. + // + // Not receiving from Incoming() or not calling Accept/Reject will block the + // electron event loop. Normally you would have a dedicated goroutine receive + // from Incoming() and start new goroutines to serve each incoming endpoint. + // The channel is closed when the Connection closes. + // + Incoming() <-chan Incoming +} + +// ConnectionOption can be passed when creating a connection to configure various options +type ConnectionOption func(*connection) + +// Server returns a ConnectionOption to put the connection in server mode. +// +// A server connection will do protocol negotiation to accept a incoming AMQP +// connection. Normally you would call this for a connection created by +// net.Listener.Accept() +// +func Server() ConnectionOption { return func(c *connection) { c.engine.Server() } } + +// AllowIncoming returns a ConnectionOption to enable incoming endpoint open requests. +// See Connection.Incoming() +func AllowIncoming() ConnectionOption { + return func(c *connection) { c.incoming = make(chan Incoming) } +} + +type connection struct { + endpoint + defaultSessionOnce, closeOnce sync.Once + + container *container + conn net.Conn + incoming chan Incoming + handler *handler + engine *proton.Engine + eConnection proton.Connection + + defaultSession Session +} + +func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) { + c := &connection{container: cont, conn: conn} + c.handler = newHandler(c) + var err error + c.engine, err = proton.NewEngine(c.conn, c.handler.delegator) + if err != nil { + return nil, err + } + for _, set := range setting { + set(c) + } + c.endpoint = makeEndpoint(c.engine.String()) + c.eConnection = c.engine.Connection() + go c.run() + return c, nil +} + +func (c *connection) run() { + c.engine.Run() + if c.incoming != nil { + close(c.incoming) + } + c.closed(Closed) +} + +func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) } + +func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) } + +func (c *connection) Session(setting ...SessionOption) (Session, error) { + var s Session + err := c.engine.InjectWait(func() error { + if c.Error() != nil { + return c.Error() + } + eSession, err := c.engine.Connection().Session() + if err == nil { + eSession.Open() + if err == nil { + s = newSession(c, eSession, setting...) + } + } + return err + }) + return s, err +} + +func (c *connection) Container() Container { return c.container } + +func (c *connection) DefaultSession() (s Session, err error) { + c.defaultSessionOnce.Do(func() { + c.defaultSession, err = c.Session() + }) + if err == nil { + err = c.Error() + } + return c.defaultSession, err +} + +func (c *connection) Sender(setting ...LinkOption) (Sender, error) { + if s, err := c.DefaultSession(); err == nil { + return s.Sender(setting...) + } else { + return nil, err + } +} + +func (c *connection) Receiver(setting ...LinkOption) (Receiver, error) { + if s, err := c.DefaultSession(); err == nil { + return s.Receiver(setting...) + } else { + return nil, err + } +} + +func (c *connection) Connection() Connection { return c } + +func (c *connection) Wait() error { return c.WaitTimeout(Forever) } +func (c *connection) WaitTimeout(timeout time.Duration) error { + _, err := timedReceive(c.done, timeout) + if err == Timeout { + return Timeout + } + return c.Error() +} + +func (c *connection) Incoming() <-chan Incoming { return c.incoming } + +// Incoming is the interface for incoming requests to open an endpoint. +// Implementing types are IncomingSession, IncomingSender and IncomingReceiver. +type Incoming interface { + // Accept and open the endpoint. + Accept() Endpoint + + // Reject the endpoint with an error + Reject(error) + + // wait for and call the accept function, call in proton goroutine. + wait() error + pEndpoint() proton.Endpoint +} + +type incoming struct { + endpoint proton.Endpoint + acceptCh chan func() error +} + +func makeIncoming(e proton.Endpoint) incoming { + return incoming{endpoint: e, acceptCh: make(chan func() error)} +} + ++func (in *incoming) String() string { return fmt.Sprintf("%s: %s", in.endpoint.Type(), in.endpoint) } +func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } } + +// Call in proton goroutine, wait for and call the accept function fr +func (in *incoming) wait() error { return (<-in.acceptCh)() } + +func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint } + +// Called in app goroutine to send an accept function to proton and return the resulting endpoint. +func (in *incoming) accept(f func() Endpoint) Endpoint { + done := make(chan Endpoint) + in.acceptCh <- func() error { + ep := f() + done <- ep + return nil + } + return <-done +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/electron/endpoint.go ---------------------------------------------------------------------- diff --cc electron/endpoint.go index 8cbeadb,0000000..2b1f62d mode 100644,000000..100644 --- a/electron/endpoint.go +++ b/electron/endpoint.go @@@ -1,94 -1,0 +1,99 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "io" + "qpid.apache.org/proton" +) + +// Closed is an alias for io.EOF. It is returned as an error when an endpoint +// was closed cleanly. +var Closed = io.EOF + +// Endpoint is the common interface for Connection, Session, Link, Sender and Receiver. +// +// Endpoints can be created locally or by the remote peer. You must Open() an +// endpoint before you can use it. Some endpoints have additional Set*() methods +// that must be called before Open() to take effect, see Connection, Session, +// Link, Sender and Receiver for details. +// +type Endpoint interface { + // Close an endpoint and signal an error to the remote end if error != nil. + Close(error) + + // String is a human readable identifier, useful for debugging and logging. + String() string + + // Error returns nil if the endpoint is open, otherwise returns an error. + // Error() == Closed means the endpoint was closed without error. + Error() error + + // Connection containing the endpoint + Connection() Connection + + // Done returns a channel that will close when the endpoint closes. + // Error() will contain the reason. + Done() <-chan struct{} +} + +// DEVELOPER NOTES +// +// An electron.Endpoint corresponds to a proton.Endpoint, which can be invalidated +// +type endpoint struct { + err proton.ErrorHolder + str string // Must be set by the value that embeds endpoint. + done chan struct{} +} + +func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} } + +// Called in handler on a Closed event. Marks the endpoint as closed and the corresponding +// proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure +// the pointer has not been invalidated. +// +// Returns the error stored on the endpoint, which may not be different to err if there was +// already a n error +func (e *endpoint) closed(err error) error { - e.err.Set(err) - e.err.Set(Closed) - close(e.done) ++ select { ++ case <-e.done: ++ // Already closed ++ default: ++ e.err.Set(err) ++ e.err.Set(Closed) ++ close(e.done) ++ } + return e.err.Get() +} + +func (e *endpoint) String() string { return e.str } + +func (e *endpoint) Error() error { return e.err.Get() } + +func (e *endpoint) Done() <-chan struct{} { return e.done } + +// Call in proton goroutine to initiate closing an endpoint locally +// handler will complete the close when remote end closes. +func localClose(ep proton.Endpoint, err error) { + if ep.State().LocalActive() { + proton.CloseError(ep, err) + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/electron/sender.go ---------------------------------------------------------------------- diff --cc electron/sender.go index 573e9da,0000000..834eb75 mode 100644,000000..100644 --- a/electron/sender.go +++ b/electron/sender.go @@@ -1,274 -1,0 +1,273 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +// #include <proton/disposition.h> +import "C" + +import ( ++ "fmt" + "qpid.apache.org/amqp" + "qpid.apache.org/proton" + "time" +) + +// Sender is a Link that sends messages. +// +// The result of sending a message is provided by an Outcome value. +// +// A sender can buffer messages up to the credit limit provided by the remote receiver. +// Send* methods will block if the buffer is full until there is space. +// Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error. +// +type Sender interface { + Link + + // SendSync sends a message and blocks until the message is acknowledged by the remote receiver. + // Returns an Outcome, which may contain an error if the message could not be sent. + SendSync(m amqp.Message) Outcome + + // SendWaitable puts a message in the send buffer and returns a channel that + // you can use to wait for the Outcome of just that message. The channel is + // buffered so you can receive from it whenever you want without blocking anything. + SendWaitable(m amqp.Message) <-chan Outcome + + // SendForget buffers a message for sending and returns, with no notification of the outcome. + SendForget(m amqp.Message) + + // SendAsync puts a message in the send buffer and returns immediately. An + // Outcome with Value = value will be sent to the ack channel when the remote + // receiver has acknowledged the message or if there is an error. + // + // You can use the same ack channel for many calls to SendAsync(), possibly on + // many Senders. The channel will receive the outcomes in the order they + // become available. The channel should be buffered and/or served by dedicated + // goroutines to avoid blocking the connection. + // + // If ack == nil no Outcome is sent. + SendAsync(m amqp.Message, ack chan<- Outcome, value interface{}) + + SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration) + + SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome + + SendForgetTimeout(m amqp.Message, timeout time.Duration) + + SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome +} + +// Outcome provides information about the outcome of sending a message. +type Outcome struct { + // Status of the message: was it sent, how was it acknowledged. + Status SentStatus + // Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise. + Error error + // Value provided by the application in SendAsync() + Value interface{} +} + +// SentStatus indicates the status of a sent message. +type SentStatus int + +const ( + // Message was never sent + Unsent SentStatus = iota + // Message was sent but never acknowledged. It may or may not have been received. + Unacknowledged - // Message was sent pre-settled, no remote outcome is available. - Presettled - // Message was accepted by the receiver ++ // Message was accepted by the receiver (or was sent pre-settled, accept is assumed) + Accepted + // Message was rejected as invalid by the receiver + Rejected + // Message was not processed by the receiver but may be valid for a different receiver + Released + // Receiver responded with an unrecognized status. + Unknown +) + +// String human readable name for SentStatus. +func (s SentStatus) String() string { + switch s { + case Unsent: + return "unsent" + case Unacknowledged: + return "unacknowledged" + case Accepted: + return "accepted" + case Rejected: + return "rejected" + case Released: + return "released" + case Unknown: + return "unknown" + default: - return "invalid" ++ return fmt.Sprintf("invalid(%d)", s) + } +} + +// Convert proton delivery state code to SentStatus value +func sentStatus(d uint64) SentStatus { + switch d { + case proton.Accepted: + return Accepted + case proton.Rejected: + return Rejected + case proton.Released, proton.Modified: + return Released + default: + return Unknown + } +} + +// Sender implementation, held by handler. +type sender struct { + link + credit chan struct{} // Signal available credit. +} + +func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) { + // wait for credit + if _, err := timedReceive(s.credit, t); err != nil { + if err == Closed && s.Error != nil { + err = s.Error() + } + ack <- Outcome{Unsent, err, v} + return + } + // Send a message in handler goroutine + err := s.engine().Inject(func() { + if s.Error() != nil { + if ack != nil { + ack <- Outcome{Unsent, s.Error(), v} + } + return + } + if delivery, err := s.eLink.Send(m); err == nil { + if ack != nil { // We must report an outcome + if s.SndSettle() == SndSettled { + delivery.Settle() // Pre-settle if required - ack <- Outcome{Presettled, nil, v} ++ ack <- Outcome{Accepted, nil, v} + } else { + s.handler().sentMessages[delivery] = sentMessage{ack, v} + } + } else { // ack == nil, can't report outcome + if s.SndSettle() != SndUnsettled { // Pre-settle unless we are forced not to. + delivery.Settle() + } + } + } else { // err != nil + if ack != nil { + ack <- Outcome{Unsent, err, v} + } + } + if s.eLink.Credit() > 0 { // Signal there is still credit + s.sendable() + } + }) + if err != nil && ack != nil { + ack <- Outcome{Unsent, err, v} + } +} + +// Set credit flag if not already set. Non-blocking, any goroutine +func (s *sender) sendable() { + select { // Non-blocking + case s.credit <- struct{}{}: + default: + } +} + +func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan Outcome { + out := make(chan Outcome, 1) + s.SendAsyncTimeout(m, out, nil, t) + return out +} + +func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) { + s.SendAsyncTimeout(m, nil, nil, t) +} + +func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome { + deadline := time.Now().Add(t) + ack := s.SendWaitableTimeout(m, t) + t = deadline.Sub(time.Now()) // Adjust for time already spent. + if t < 0 { + t = 0 + } + if out, err := timedReceive(ack, t); err == nil { + return out.(Outcome) + } else { + if err == Closed && s.Error() != nil { + err = s.Error() + } + return Outcome{Unacknowledged, err, nil} + } +} + +func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) { + s.SendAsyncTimeout(m, ack, v, Forever) +} + +func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome { + return s.SendWaitableTimeout(m, Forever) +} + +func (s *sender) SendForget(m amqp.Message) { + s.SendForgetTimeout(m, Forever) +} + +func (s *sender) SendSync(m amqp.Message) Outcome { + return <-s.SendWaitable(m) +} + +// handler goroutine +func (s *sender) closed(err error) { + s.link.closed(err) + close(s.credit) +} + +func newSender(l link) *sender { + s := &sender{link: l, credit: make(chan struct{}, 1)} + s.handler().addLink(s.eLink, s) + s.link.open() + return s +} + +// sentMessage records a sent message on the handler. +type sentMessage struct { + ack chan<- Outcome + value interface{} +} + +// IncomingSender is sent on the Connection.Incoming() channel when there is +// an incoming request to open a sender link. +type IncomingSender struct { + incomingLink +} + +// Accept accepts an incoming sender endpoint +func (in *IncomingSender) Accept() Endpoint { + return in.accept(func() Endpoint { return newSender(in.link) }) +} + +// Call in injected functions to check if the sender is valid. +func (s *sender) valid() bool { + s2, ok := s.handler().links[s.eLink].(*sender) + return ok && s2 == s +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c87499a3/proton/engine.go ---------------------------------------------------------------------- diff --cc proton/engine.go index 2e67ef7,0000000..13d44b8 mode 100644,000000..100644 --- a/proton/engine.go +++ b/proton/engine.go @@@ -1,403 -1,0 +1,397 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package proton + +// #include <proton/connection.h> +// #include <proton/event.h> ++// #include <proton/error.h> +// #include <proton/handlers.h> +// #include <proton/session.h> +// #include <proton/transport.h> +// #include <memory.h> +// #include <stdlib.h> +// +// PN_HANDLE(REMOTE_ADDR) +import "C" + +import ( + "fmt" + "io" + "net" + "sync" + "unsafe" +) + +// Injecter allows functions to be "injected" into the event-processing loop, to +// be called in the same goroutine as event handlers. +type Injecter interface { + // Inject a function into the engine goroutine. + // + // f() will be called in the same goroutine as event handlers, so it can safely + // use values belonging to event handlers without synchronization. f() should + // not block, no further events or injected functions can be processed until + // f() returns. + // + // Returns a non-nil error if the function could not be injected and will + // never be called. Otherwise the function will eventually be called. + // + // Note that proton values (Link, Session, Connection etc.) that existed when + // Inject(f) was called may have become invalid by the time f() is executed. + // Handlers should handle keep track of Closed events to ensure proton values + // are not used after they become invalid. One technique is to have map from + // proton values to application values. Check that the map has the correct + // proton/application value pair at the start of the injected function and + // delete the value from the map when handling a Closed event. + Inject(f func()) error + + // InjectWait is like Inject but does not return till f() has completed. + // If f() cannot be injected it returns the error from Inject(), otherwise + // it returns the error from f() + InjectWait(f func() error) error +} + +// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel. +type bufferChan struct { + buffers chan []byte + buf1, buf2 []byte +} + +func newBufferChan(size int) *bufferChan { + return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)} +} + +func (b *bufferChan) buffer() []byte { + b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers. + return b.buf1[:cap(b.buf1)] +} + +// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate +// Handler functions sequentially in a single goroutine. Actions taken by +// Handler functions (such as sending messages) are encoded and written to the +// net.Conn. You can create multiple Engines to handle multiple connections +// concurrently. +// +// You implement the EventHandler and/or MessagingHandler interfaces and provide +// those values to NewEngine(). Their HandleEvent method will be called in the +// event-handling goroutine. +// +// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to +// other goroutines, store them, or use them as map indexes. Effectively they are +// just pointers. Other goroutines cannot call their methods directly but they can +// can create a function closure to call such methods and pass it to Engine.Inject() +// to have it evaluated in the engine goroutine. +// +// You are responsible for ensuring you don't use an event value after it is +// invalid. The handler methods will tell you when a value is no longer valid. For +// example after a LinkClosed event, that link is no longer valid. If you do +// Link.Close() yourself (in a handler or injected function) the link remains valid +// until the corresponing LinkClosed event is received by the handler. +// +// Engine.Close() will take care of cleaning up any remaining values when you are +// done with the Engine. All values associated with a engine become invalid when you +// call Engine.Close() +// +// The qpid.apache.org/proton/concurrent package will do all this for you, so it +// may be a better choice for some applications. +// +type Engine struct { + // Error is set on exit from Run() if there was an error. + err ErrorHolder + inject chan func() + + conn net.Conn + connection Connection + transport Transport + collector *C.pn_collector_t + read *bufferChan // Read buffers channel. + write *bufferChan // Write buffers channel. + handlers []EventHandler // Handlers for proton events. + running chan struct{} // This channel will be closed when the goroutines are done. + closeOnce sync.Once +} + +const bufferSize = 4096 + +// NewEngine initializes a engine with a connection and handlers. To start it running: +// eng := NewEngine(...) +// go run eng.Run() +// The goroutine will exit when the engine is closed or disconnected. +// You can check for errors on Engine.Error. +// +func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) { + // Save the connection ID for Connection.String() + eng := &Engine{ + inject: make(chan func()), + conn: conn, + transport: Transport{C.pn_transport()}, + connection: Connection{C.pn_connection()}, + collector: C.pn_collector(), + handlers: handlers, + read: newBufferChan(bufferSize), + write: newBufferChan(bufferSize), + running: make(chan struct{}), + } + if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil { + return nil, fmt.Errorf("failed to allocate engine") + } + + // TODO aconway 2015-06-25: connection settings for user, password, container etc. + // before transport.Bind() Set up connection before Engine, allow Engine or Reactor + // to run connection. + + // Unique container-id by default. + eng.connection.SetContainer(UUID4().String()) + pnErr := eng.transport.Bind(eng.connection) + if pnErr != 0 { + return nil, fmt.Errorf("cannot setup engine: %s", PnErrorCode(pnErr)) + } + C.pn_connection_collect(eng.connection.pn, eng.collector) + eng.connection.Open() + return eng, nil +} + +func (eng *Engine) String() string { + return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr()) +} + +func (eng *Engine) Id() string { + return fmt.Sprintf("%eng", &eng) +} + +func (eng *Engine) Error() error { + return eng.err.Get() +} + +// Inject a function into the Engine's event loop. +// +// f() will be called in the same event-processing goroutine that calls Handler +// methods. f() can safely call methods on values that belong to this engine +// (Sessions, Links etc) +// +// The injected function has no parameters or return values. It is normally a +// closure and can use channels to communicate with the injecting goroutine if +// necessary. +// +// Returns a non-nil error if the engine is closed before the function could be +// injected. +func (eng *Engine) Inject(f func()) error { + select { + case eng.inject <- f: + return nil + case <-eng.running: + return eng.Error() + } +} + +// InjectWait is like Inject but does not return till f() has completed or the +// engine is closed, and returns an error value from f() +func (eng *Engine) InjectWait(f func() error) error { + done := make(chan error) + defer close(done) + err := eng.Inject(func() { done <- f() }) + if err != nil { + return err + } + select { + case <-eng.running: + return eng.Error() + case err := <-done: + return err + } +} + +// Server puts the Engine in server mode, meaning it will auto-detect security settings on +// the incoming connnection such as use of SASL and SSL. +// Must be called before Run() +// +func (eng *Engine) Server() { eng.transport.SetServer() } + +// Close the engine's connection, returns when the engine has exited. +func (eng *Engine) Close(err error) { + eng.err.Set(err) + eng.Inject(func() { + CloseError(eng.connection, err) + }) + <-eng.running +} + +// Disconnect the engine's connection without and AMQP close, returns when the engine has exited. +func (eng *Engine) Disconnect(err error) { + eng.err.Set(err) + eng.conn.Close() + <-eng.running +} + +// Run the engine. Engine.Run() will exit when the engine is closed or +// disconnected. You can check for errors after exit with Engine.Error(). +// +func (eng *Engine) Run() error { + wait := sync.WaitGroup{} + wait.Add(2) // Read and write goroutines + + readErr := make(chan error, 1) // Don't block + go func() { // Read goroutine + defer wait.Done() + for { + rbuf := eng.read.buffer() + n, err := eng.conn.Read(rbuf) + if n > 0 { + eng.read.buffers <- rbuf[:n] + } + if err != nil { + readErr <- err + close(readErr) + close(eng.read.buffers) + return + } + } + }() + + writeErr := make(chan error, 1) // Don't block + go func() { // Write goroutine + defer wait.Done() + for { + wbuf, ok := <-eng.write.buffers + if !ok { + return + } + _, err := eng.conn.Write(wbuf) + if err != nil { + writeErr <- err + close(writeErr) + return + } + } + }() + + wbuf := eng.write.buffer()[:0] + + for eng.err.Get() == nil { + if len(wbuf) == 0 { + eng.pop(&wbuf) + } + // Don't set wchan unless there is something to write. + var wchan chan []byte + if len(wbuf) > 0 { + wchan = eng.write.buffers + } + + select { + case buf, ok := <-eng.read.buffers: // Read a buffer + if ok { + eng.push(buf) + } + case wchan <- wbuf: // Write a buffer + wbuf = eng.write.buffer()[:0] + case f, ok := <-eng.inject: // Function injected from another goroutine + if ok { + f() + } + case err := <-readErr: + eng.netError(err) + case err := <-writeErr: + eng.netError(err) + } + eng.process() + } + close(eng.write.buffers) + eng.conn.Close() // Make sure connection is closed + wait.Wait() + close(eng.running) // Signal goroutines have exited and Error is set. + - // Execute any injected functions for side effects on application data structures. - inject := eng.inject - eng.inject = nil // Further calls to Inject() will return an error. - for f := range inject { - f() - } - + if !eng.connection.IsNil() { + eng.connection.Free() + } + if !eng.transport.IsNil() { + eng.transport.Free() + } + if eng.collector != nil { + C.pn_collector_free(eng.collector) + } + for _, h := range eng.handlers { + switch h := h.(type) { + case cHandler: + C.pn_handler_free(h.pn) + } + } + return eng.err.Get() +} + +func (eng *Engine) netError(err error) { + eng.err.Set(err) + eng.transport.CloseHead() + eng.transport.CloseTail() +} + +func minInt(a, b int) int { + if a < b { + return a + } else { + return b + } +} + +func (eng *Engine) pop(buf *[]byte) { + pending := int(eng.transport.Pending()) + switch { + case pending == int(C.PN_EOS): + *buf = (*buf)[:] + return + case pending < 0: + panic(fmt.Errorf("%s", PnErrorCode(pending))) + } + size := minInt(pending, cap(*buf)) + *buf = (*buf)[:size] + if size == 0 { + return + } + C.memcpy(unsafe.Pointer(&(*buf)[0]), eng.transport.Head(), C.size_t(size)) + assert(size > 0) + eng.transport.Pop(uint(size)) +} + +func (eng *Engine) push(buf []byte) { + buf2 := buf + for len(buf2) > 0 { + n := eng.transport.Push(buf2) + if n <= 0 { + panic(fmt.Errorf("error in transport: %s", PnErrorCode(n))) + } + buf2 = buf2[n:] + } +} + +func (eng *Engine) handle(e Event) { + for _, h := range eng.handlers { + h.HandleEvent(e) + } + if e.Type() == ETransportClosed { + eng.err.Set(io.EOF) + } +} + +func (eng *Engine) process() { + for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) { + eng.handle(makeEvent(ce, eng)) + C.pn_collector_pop(eng.collector) + } +} + +func (eng *Engine) Connection() Connection { return eng.connection } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
