Merge branch 'master' into go1 # Conflicts: # readme-branch.md
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9e788b2d Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9e788b2d Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9e788b2d Branch: refs/heads/go1 Commit: 9e788b2d484a79154d9d05a93bd6447730ab7738 Parents: e1a83ee 1cfa056 Author: Alan Conway <[email protected]> Authored: Mon Nov 23 12:36:33 2015 -0500 Committer: Alan Conway <[email protected]> Committed: Mon Nov 23 12:52:44 2015 -0500 ---------------------------------------------------------------------- amqp/error.go | 39 ++- amqp/interop | 1 - amqp/marshal.go | 10 +- amqp/message.go | 9 +- amqp/types.go | 6 +- amqp/unmarshal.go | 41 +-- amqp/url.go | 6 +- amqp/url_test.go | 2 +- electron/connection.go | 128 ++++++---- electron/container.go | 20 +- electron/doc.go | 14 +- electron/endpoint.go | 36 ++- electron/error.go | 35 +++ electron/handler.go | 97 ++++--- electron/link.go | 86 ++++--- electron/messaging_test.go | 182 +++++++------- electron/receiver.go | 60 +++-- electron/sender.go | 375 ++++++++++++--------------- electron/session.go | 73 +++--- electron/time.go | 13 +- internal/error.go | 118 --------- internal/flexchannel.go | 82 ------ internal/flexchannel_test.go | 89 ------- internal/safemap.go | 57 ----- internal/uuid.go | 70 ------ proton/engine.go | 27 +- proton/error.go | 104 ++++---- proton/handlers.go | 7 +- proton/interop_test.go | 290 --------------------- proton/marshal.go | 210 ---------------- proton/message.go | 29 ++- proton/types.go | 151 ----------- proton/unfinished.go | 53 ---- proton/unmarshal.go | 517 -------------------------------------- proton/url.go | 96 ------- proton/url_test.go | 51 ---- proton/uuid.go | 57 +++++ proton/wrappers.go | 39 +-- proton/wrappers_gen.go | 9 +- readme-go-get.md | 18 ++ 40 files changed, 873 insertions(+), 2434 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/error.go ---------------------------------------------------------------------- diff --cc amqp/error.go index 868dbf3,0000000..349fc41 mode 100644,000000..100644 --- a/amqp/error.go +++ b/amqp/error.go @@@ -1,66 -1,0 +1,103 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + ++// #include <proton/error.h> ++import "C" ++ +import ( + "fmt" + "reflect" +) + +// Error is an AMQP error condition. It has a name and a description. +// It implements the Go error interface so can be returned as an error value. +// +// You can pass amqp.Error to methods that pass an error to a remote endpoint, +// this gives you full control over what the remote endpoint will see. +// +// You can also pass any Go error to such functions, the remote peer +// will see the equivalent of MakeError(error) +// +type Error struct{ Name, Description string } + +// Error implements the Go error interface for AMQP error errors. - func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) } ++func (c Error) Error() string { return fmt.Sprintf("%s: %s", c.Name, c.Description) } + +// Errorf makes a Error with name and formatted description as per fmt.Sprintf +func Errorf(name, format string, arg ...interface{}) Error { + return Error{name, fmt.Sprintf(format, arg...)} +} + +// MakeError makes an AMQP error from a go error using the Go error type as the name +// and the err.Error() string as the description. +func MakeError(err error) Error { + return Error{reflect.TypeOf(err).Name(), err.Error()} +} + +var ( + InternalError = "amqp:internal-error" + NotFound = "amqp:not-found" + UnauthorizedAccess = "amqp:unauthorized-access" + DecodeError = "amqp:decode-error" + ResourceLimit = "amqp:resource-limit" + NotAllowed = "amqp:not-allowed" + InvalidField = "amqp:invalid-field" + NotImplemented = "amqp:not-implemented" + ResourceLocked = "amqp:resource-locked" + PreerrorFailed = "amqp:preerror-failed" + ResourceDeleted = "amqp:resource-deleted" + IllegalState = "amqp:illegal-state" + FrameSizeTooSmall = "amqp:frame-size-too-small" +) ++ ++type PnErrorCode int ++ ++func (e PnErrorCode) String() string { ++ switch e { ++ case C.PN_EOS: ++ return "end-of-data" ++ case C.PN_ERR: ++ return "error" ++ case C.PN_OVERFLOW: ++ return "overflow" ++ case C.PN_UNDERFLOW: ++ return "underflow" ++ case C.PN_STATE_ERR: ++ return "bad-state" ++ case C.PN_ARG_ERR: ++ return "invalid-argument" ++ case C.PN_TIMEOUT: ++ return "timeout" ++ case C.PN_INTR: ++ return "interrupted" ++ case C.PN_INPROGRESS: ++ return "in-progress" ++ default: ++ return fmt.Sprintf("unknown-error(%d)", e) ++ } ++} ++ ++func PnError(e *C.pn_error_t) error { ++ if e == nil || C.pn_error_code(e) == 0 { ++ return nil ++ } ++ return fmt.Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e))) ++} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/marshal.go ---------------------------------------------------------------------- diff --cc amqp/marshal.go index 666b4f6,0000000..9930e13 mode 100644,000000..100644 --- a/amqp/marshal.go +++ b/amqp/marshal.go @@@ -1,250 -1,0 +1,250 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +import "C" + +import ( ++ "fmt" + "io" - "qpid.apache.org/internal" + "reflect" + "unsafe" +) + +func dataError(prefix string, data *C.pn_data_t) error { - err := internal.PnError(unsafe.Pointer(C.pn_data_error(data))) ++ err := PnError(C.pn_data_error(data)) + if err != nil { - err = internal.Errorf("%s: %s", prefix, err.(internal.Error)) ++ err = fmt.Errorf("%s: %s", prefix, err.Error()) + } + return err +} + +/* +Marshal encodes a Go value as AMQP data in buffer. +If buffer is nil, or is not large enough, a new buffer is created. + +Returns the buffer used for encoding with len() adjusted to the actual size of data. + +Go types are encoded as follows + + +-------------------------------------+--------------------------------------------+ + |Go type |AMQP type | + +-------------------------------------+--------------------------------------------+ + |bool |bool | + +-------------------------------------+--------------------------------------------+ + |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) | + +-------------------------------------+--------------------------------------------+ + |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) | + +-------------------------------------+--------------------------------------------+ + |float32, float64 |float, double. | + +-------------------------------------+--------------------------------------------+ + |string |string | + +-------------------------------------+--------------------------------------------+ + |[]byte, Binary |binary | + +-------------------------------------+--------------------------------------------+ + |Symbol |symbol | + +-------------------------------------+--------------------------------------------+ + |interface{} |the contained type | + +-------------------------------------+--------------------------------------------+ + |nil |null | + +-------------------------------------+--------------------------------------------+ + |map[K]T |map with K and T converted as above | + +-------------------------------------+--------------------------------------------+ + |Map |map, may have mixed types for keys, values | + +-------------------------------------+--------------------------------------------+ + |[]T |list with T converted as above | + +-------------------------------------+--------------------------------------------+ + |List |list, may have mixed types values | + +-------------------------------------+--------------------------------------------+ + +The following Go types cannot be marshaled: uintptr, function, interface, channel + +TODO + +Go types: array, slice, struct, complex64/128. + +AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies. + +Described types. + +*/ +func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { + defer doRecover(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) + marshal(v, data) + encode := func(buf []byte) ([]byte, error) { + n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf))) + switch { + case n == int(C.PN_OVERFLOW): + return buf, overflow + case n < 0: + return buf, dataError("marshal error", data) + default: + return buf[:n], nil + } + } + return encodeGrow(buffer, encode) +} + +const minEncode = 256 + +// overflow is returned when an encoding function can't fit data in the buffer. - var overflow = internal.Errorf("buffer too small") ++var overflow = fmt.Errorf("buffer too small") + +// encodeFn encodes into buffer[0:len(buffer)]. +// Returns buffer with length adjusted for data encoded. +// If buffer too small, returns overflow as error. +type encodeFn func(buffer []byte) ([]byte, error) + +// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer. +// Returns the final buffer. +func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) { + if buffer == nil || len(buffer) == 0 { + buffer = make([]byte, minEncode) + } + var err error + for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) { + buffer = make([]byte, 2*len(buffer)) + } + return buffer, err +} + +func marshal(v interface{}, data *C.pn_data_t) { + switch v := v.(type) { + case nil: + C.pn_data_put_null(data) + case bool: + C.pn_data_put_bool(data, C.bool(v)) + case int8: + C.pn_data_put_byte(data, C.int8_t(v)) + case int16: + C.pn_data_put_short(data, C.int16_t(v)) + case int32: + C.pn_data_put_int(data, C.int32_t(v)) + case int64: + C.pn_data_put_long(data, C.int64_t(v)) + case int: + if unsafe.Sizeof(0) == 8 { + C.pn_data_put_long(data, C.int64_t(v)) + } else { + C.pn_data_put_int(data, C.int32_t(v)) + } + case uint8: + C.pn_data_put_ubyte(data, C.uint8_t(v)) + case uint16: + C.pn_data_put_ushort(data, C.uint16_t(v)) + case uint32: + C.pn_data_put_uint(data, C.uint32_t(v)) + case uint64: + C.pn_data_put_ulong(data, C.uint64_t(v)) + case uint: + if unsafe.Sizeof(0) == 8 { + C.pn_data_put_ulong(data, C.uint64_t(v)) + } else { + C.pn_data_put_uint(data, C.uint32_t(v)) + } + case float32: + C.pn_data_put_float(data, C.float(v)) + case float64: + C.pn_data_put_double(data, C.double(v)) + case string: + C.pn_data_put_string(data, pnBytes([]byte(v))) + case []byte: + C.pn_data_put_binary(data, pnBytes(v)) + case Binary: + C.pn_data_put_binary(data, pnBytes([]byte(v))) + case Symbol: + C.pn_data_put_symbol(data, pnBytes([]byte(v))) + case Map: // Special map type + C.pn_data_put_map(data) + C.pn_data_enter(data) + for key, val := range v { + marshal(key, data) + marshal(val, data) + } + C.pn_data_exit(data) + default: + switch reflect.TypeOf(v).Kind() { + case reflect.Map: + putMap(data, v) + case reflect.Slice: + putList(data, v) + default: - panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) ++ panic(fmt.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) + } + } + err := dataError("marshal", data) + if err != nil { + panic(err) + } + return +} + +func clearMarshal(v interface{}, data *C.pn_data_t) { + C.pn_data_clear(data) + marshal(v, data) +} + +func putMap(data *C.pn_data_t, v interface{}) { + mapValue := reflect.ValueOf(v) + C.pn_data_put_map(data) + C.pn_data_enter(data) + for _, key := range mapValue.MapKeys() { + marshal(key.Interface(), data) + marshal(mapValue.MapIndex(key).Interface(), data) + } + C.pn_data_exit(data) +} + +func putList(data *C.pn_data_t, v interface{}) { + listValue := reflect.ValueOf(v) + C.pn_data_put_list(data) + C.pn_data_enter(data) + for i := 0; i < listValue.Len(); i++ { + marshal(listValue.Index(i).Interface(), data) + } + C.pn_data_exit(data) +} + +// Encoder encodes AMQP values to an io.Writer +type Encoder struct { + writer io.Writer + buffer []byte +} + +// New encoder returns a new encoder that writes to w. +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w, make([]byte, minEncode)} +} + +func (e *Encoder) Encode(v interface{}) (err error) { + e.buffer, err = Marshal(v, e.buffer) + if err == nil { + e.writer.Write(e.buffer) + } + return err +} + +func replace(data *C.pn_data_t, v interface{}) { + C.pn_data_clear(data) + marshal(v, data) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/message.go ---------------------------------------------------------------------- diff --cc amqp/message.go index 5ba4f4f,0000000..e36c6f2 mode 100644,000000..100644 --- a/amqp/message.go +++ b/amqp/message.go @@@ -1,347 -1,0 +1,346 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/types.h> +// #include <proton/message.h> +// #include <proton/codec.h> +// #include <stdlib.h> +// +// /* Helper for setting message string fields */ +// typedef int (*set_fn)(pn_message_t*, const char*); +// int msg_set_str(pn_message_t* m, char* s, set_fn set) { +// int result = set(m, s); +// free(s); +// return result; +// } +// +import "C" + +import ( - "qpid.apache.org/internal" ++ "fmt" + "runtime" + "time" + "unsafe" +) + +// Message is the interface to an AMQP message. +type Message interface { + // Durable indicates that any parties taking responsibility + // for the message must durably store the content. + Durable() bool + SetDurable(bool) + + // Priority impacts ordering guarantees. Within a + // given ordered context, higher priority messages may jump ahead of + // lower priority messages. + Priority() uint8 + SetPriority(uint8) + + // TTL or Time To Live, a message it may be dropped after this duration + TTL() time.Duration + SetTTL(time.Duration) + + // FirstAcquirer indicates + // that the recipient of the message is the first recipient to acquire + // the message, i.e. there have been no failed delivery attempts to + // other acquirers. Note that this does not mean the message has not + // been delivered to, but not acquired, by other recipients. + FirstAcquirer() bool + SetFirstAcquirer(bool) + + // DeliveryCount tracks how many attempts have been made to + // delivery a message. + DeliveryCount() uint32 + SetDeliveryCount(uint32) + + // MessageId provides a unique identifier for a message. + // it can be an a string, an unsigned long, a uuid or a + // binary value. + MessageId() interface{} + SetMessageId(interface{}) + + UserId() string + SetUserId(string) + + Address() string + SetAddress(string) + + Subject() string + SetSubject(string) + + ReplyTo() string + SetReplyTo(string) + + // CorrelationId is set on correlated request and response messages. It can be + // an a string, an unsigned long, a uuid or a binary value. + CorrelationId() interface{} + SetCorrelationId(interface{}) + + ContentType() string + SetContentType(string) + + ContentEncoding() string + SetContentEncoding(string) + + // ExpiryTime indicates an absoulte time when the message may be dropped. + // A Zero time (i.e. t.isZero() == true) indicates a message never expires. + ExpiryTime() time.Time + SetExpiryTime(time.Time) + + CreationTime() time.Time + SetCreationTime(time.Time) + + GroupId() string + SetGroupId(string) + + GroupSequence() int32 + SetGroupSequence(int32) + + ReplyToGroupId() string + SetReplyToGroupId(string) + + // Instructions - AMQP delivery instructions. + Instructions() map[string]interface{} + SetInstructions(v map[string]interface{}) + + // Annotations - AMQP annotations. + Annotations() map[string]interface{} + SetAnnotations(v map[string]interface{}) + + // Properties - Application properties. + Properties() map[string]interface{} + SetProperties(v map[string]interface{}) + + // Inferred indicates how the message content + // is encoded into AMQP sections. If inferred is true then binary and + // list values in the body of the message will be encoded as AMQP DATA + // and AMQP SEQUENCE sections, respectively. If inferred is false, + // then all values in the body of the message will be encoded as AMQP + // VALUE sections regardless of their type. + Inferred() bool + SetInferred(bool) + + // Marshal a Go value into the message body. See amqp.Marshal() for details. + Marshal(interface{}) + + // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details. + Unmarshal(interface{}) + + // Body value resulting from the default unmarshalling of message body as interface{} + Body() interface{} + + // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough + // the message is encoded into it, otherwise a new buffer is created. + // Returns the buffer containing the message. + Encode(buffer []byte) ([]byte, error) + + // Decode data into this message. Overwrites an existing message content. + Decode(buffer []byte) error + + // Clear the message contents. + Clear() + + // Copy the contents of another message to this one. + Copy(m Message) error +} + +type message struct{ pn *C.pn_message_t } + +func freeMessage(m *message) { + C.pn_message_free(m.pn) + m.pn = nil +} + +// NewMessage creates a new message instance. +func NewMessage() Message { + m := &message{C.pn_message()} + runtime.SetFinalizer(m, freeMessage) + return m +} + +// NewMessageWith creates a message with value as the body. Equivalent to +// m := NewMessage(); m.Marshal(body) +func NewMessageWith(value interface{}) Message { + m := NewMessage() + m.Marshal(value) + return m +} + +func (m *message) Clear() { C.pn_message_clear(m.pn) } + +func (m *message) Copy(x Message) error { + if data, err := x.Encode(nil); err == nil { + return m.Decode(data) + } else { + return err + } +} + +// ==== message get functions + +func rewindGet(data *C.pn_data_t) (v interface{}) { + C.pn_data_rewind(data) + C.pn_data_next(data) + unmarshal(&v, data) + return v +} + +func rewindMap(data *C.pn_data_t) (v map[string]interface{}) { + C.pn_data_rewind(data) + C.pn_data_next(data) + unmarshal(&v, data) + return v +} + +func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) } +func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) } +func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) } +func (m *message) TTL() time.Duration { + return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond +} +func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) } +func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) } +func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) } +func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) } +func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) } +func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) } +func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) } +func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) } +func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) } +func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) } + +func (m *message) ExpiryTime() time.Time { + return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn)))) +} +func (m *message) CreationTime() time.Time { + return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn))) +} +func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) } +func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) } +func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) } + +func (m *message) Instructions() map[string]interface{} { + return rewindMap(C.pn_message_instructions(m.pn)) +} +func (m *message) Annotations() map[string]interface{} { + return rewindMap(C.pn_message_annotations(m.pn)) +} +func (m *message) Properties() map[string]interface{} { + return rewindMap(C.pn_message_properties(m.pn)) +} + +// ==== message set methods + +func setData(v interface{}, data *C.pn_data_t) { + C.pn_data_clear(data) + marshal(v, data) +} + +func dataString(data *C.pn_data_t) string { + str := C.pn_string(C.CString("")) + defer C.pn_free(unsafe.Pointer(str)) + C.pn_inspect(unsafe.Pointer(data), str) + return C.GoString(C.pn_string_get(str)) +} + +func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(m.Inferred())) } +func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) } +func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) } +func (m *message) SetTTL(d time.Duration) { + C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond)) +} +func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) } +func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) } +func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) } +func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) } +func (m *message) SetAddress(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address)) +} +func (m *message) SetSubject(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject)) +} +func (m *message) SetReplyTo(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to)) +} +func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) } +func (m *message) SetContentType(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type)) +} +func (m *message) SetContentEncoding(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding)) +} +func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) } +func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) } +func (m *message) SetGroupId(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id)) +} +func (m *message) SetGroupSequence(s int32) { + C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s)) +} +func (m *message) SetReplyToGroupId(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id)) +} + +func (m *message) SetInstructions(v map[string]interface{}) { + setData(v, C.pn_message_instructions(m.pn)) +} +func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) } +func (m *message) SetProperties(v map[string]interface{}) { setData(v, C.pn_message_properties(m.pn)) } + +// Marshal/Unmarshal body +func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) } +func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) } +func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return } + +func (m *message) Decode(data []byte) error { + m.Clear() + if len(data) == 0 { - return internal.Errorf("empty buffer for decode") ++ return fmt.Errorf("empty buffer for decode") + } + if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 { - return internal.Errorf("decoding message: %s", - internal.PnError(unsafe.Pointer(C.pn_message_error(m.pn)))) ++ return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn))) + } + return nil +} + +func DecodeMessage(data []byte) (m Message, err error) { + m = NewMessage() + err = m.Decode(data) + return +} + +func (m *message) Encode(buffer []byte) ([]byte, error) { + encode := func(buf []byte) ([]byte, error) { + len := cLen(buf) + result := C.pn_message_encode(m.pn, cPtr(buf), &len) + switch { + case result == C.PN_OVERFLOW: + return buf, overflow + case result < 0: - return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result)) ++ return buf, fmt.Errorf("cannot encode message: %s", PnErrorCode(result)) + default: + return buf[:len], nil + } + } + return encodeGrow(buffer, encode) +} + +// TODO aconway 2015-09-14: Multi-section messages. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/types.go ---------------------------------------------------------------------- diff --cc amqp/types.go index 796da66,0000000..abcff25 mode 100644,000000..100644 --- a/amqp/types.go +++ b/amqp/types.go @@@ -1,199 -1,0 +1,203 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +import "C" + +import ( + "bytes" + "fmt" + "reflect" + "time" + "unsafe" +) + +type Type C.pn_type_t + ++// Older proton versions don't define C.PN_INVALID, so define it here. ++// In C it is pn_type_t(-1), in Go use the bitwise NOT operator to get the same value. ++const pnInvalid = ^C.pn_type_t(0) ++ +func (t Type) String() string { + switch C.pn_type_t(t) { + case C.PN_NULL: + return "null" + case C.PN_BOOL: + return "bool" + case C.PN_UBYTE: + return "ubyte" + case C.PN_BYTE: + return "byte" + case C.PN_USHORT: + return "ushort" + case C.PN_SHORT: + return "short" + case C.PN_CHAR: + return "char" + case C.PN_UINT: + return "uint" + case C.PN_INT: + return "int" + case C.PN_ULONG: + return "ulong" + case C.PN_LONG: + return "long" + case C.PN_TIMESTAMP: + return "timestamp" + case C.PN_FLOAT: + return "float" + case C.PN_DOUBLE: + return "double" + case C.PN_DECIMAL32: + return "decimal32" + case C.PN_DECIMAL64: + return "decimal64" + case C.PN_DECIMAL128: + return "decimal128" + case C.PN_UUID: + return "uuid" + case C.PN_BINARY: + return "binary" + case C.PN_STRING: + return "string" + case C.PN_SYMBOL: + return "symbol" + case C.PN_DESCRIBED: + return "described" + case C.PN_ARRAY: + return "array" + case C.PN_LIST: + return "list" + case C.PN_MAP: + return "map" + default: - if uint32(t) == uint32(C.PN_INVALID) { ++ if uint32(t) == uint32(pnInvalid) { + return "no-data" + } + return fmt.Sprintf("unknown-type(%d)", t) + } +} + +// Go types +var ( + bytesType = reflect.TypeOf([]byte{}) + valueType = reflect.TypeOf(reflect.Value{}) +) + +// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys. + +// Map is a generic map that can have mixed key and value types and so can represent any AMQP map +type Map map[interface{}]interface{} + +// List is a generic list that can hold mixed values and can represent any AMQP list. +// +type List []interface{} + +// Symbol is a string that is encoded as an AMQP symbol +type Symbol string + +func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) } + +// Binary is a string that is encoded as an AMQP binary. +// It is a string rather than a byte[] because byte[] is not hashable and can't be used as +// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte +type Binary string + +func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) } + +// GoString for Map prints values with their types, useful for debugging. +func (m Map) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", m) + i := len(m) + for k, v := range m { + fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v) + i-- + if i > 0 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// GoString for List prints values with their types, useful for debugging. +func (l List) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", l) + for i := 0; i < len(l); i++ { + fmt.Fprintf(out, "%T(%#v)", l[i], l[i]) + if i == len(l)-1 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// pnTime converts Go time.Time to Proton millisecond Unix time. +func pnTime(t time.Time) C.pn_timestamp_t { + secs := t.Unix() + // Note: sub-second accuracy is not guaraunteed if the Unix time in + // nanoseconds cannot be represented by an int64 (sometime around year 2260) + msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) + return C.pn_timestamp_t(secs*1000 + msecs) +} + +// goTime converts a pn_timestamp_t to a Go time.Time. +func goTime(t C.pn_timestamp_t) time.Time { + secs := int64(t) / 1000 + nsecs := (int64(t) % 1000) * int64(time.Millisecond) + return time.Unix(secs, nsecs) +} + +func goBytes(cBytes C.pn_bytes_t) (bytes []byte) { + if cBytes.start != nil { + bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size)) + } + return +} + +func goString(cBytes C.pn_bytes_t) (str string) { + if cBytes.start != nil { + str = C.GoStringN(cBytes.start, C.int(cBytes.size)) + } + return +} + +func pnBytes(b []byte) C.pn_bytes_t { + if len(b) == 0 { + return C.pn_bytes_t{0, nil} + } else { + return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))} + } +} + +func cPtr(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} + +func cLen(b []byte) C.size_t { + return C.size_t(len(b)) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/unmarshal.go ---------------------------------------------------------------------- diff --cc amqp/unmarshal.go index 751921d,0000000..25bb519 mode 100644,000000..100644 --- a/amqp/unmarshal.go +++ b/amqp/unmarshal.go @@@ -1,558 -1,0 +1,561 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +oor more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +import "C" + +import ( + "bytes" + "fmt" + "io" - "qpid.apache.org/internal" + "reflect" + "unsafe" +) + +const minDecode = 1024 + +// Error returned if AMQP data cannot be unmarshaled as the desired Go type. +type UnmarshalError struct { + // The name of the AMQP type. + AMQPType string + // The Go type. + GoType reflect.Type +} + +func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError { + return &UnmarshalError{Type(pnType).String(), reflect.TypeOf(v)} +} + +func (e UnmarshalError) Error() string { + if e.GoType.Kind() != reflect.Ptr { - return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType) ++ return fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType) + } else { - return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) ++ return fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) + } +} + +func doRecover(err *error) { + r := recover() + switch r := r.(type) { + case nil: - case *UnmarshalError, internal.Error: - *err = r.(error) ++ case *UnmarshalError: ++ *err = r + default: + panic(r) + } +} + +// +// Decoding from a pn_data_t +// +// NOTE: we use panic() to signal a decoding error, simplifies decoding logic. +// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode. +// + +// Decoder decodes AMQP values from an io.Reader. +// +type Decoder struct { + reader io.Reader + buffer bytes.Buffer +} + +// NewDecoder returns a new decoder that reads from r. +// +// The decoder has it's own buffer and may read more data than required for the +// AMQP values requested. Use Buffered to see if there is data left in the +// buffer. +// +func NewDecoder(r io.Reader) *Decoder { + return &Decoder{r, bytes.Buffer{}} +} + +// Buffered returns a reader of the data remaining in the Decoder's buffer. The +// reader is valid until the next call to Decode. +// +func (d *Decoder) Buffered() io.Reader { + return bytes.NewReader(d.buffer.Bytes()) +} + +// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v. +// +// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value. +// +func (d *Decoder) Decode(v interface{}) (err error) { + defer doRecover(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) + var n int - for n == 0 && err == nil { - n = decode(data, d.buffer.Bytes()) ++ for n == 0 { ++ n, err = decode(data, d.buffer.Bytes()) ++ if err != nil { ++ return err ++ } + if n == 0 { // n == 0 means not enough data, read more + err = d.more() + } else { + unmarshal(v, data) + } + } + d.buffer.Next(n) + return +} + +/* +Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v. +Types are converted as follows: + + +---------------------------+----------------------------------------------------------------------+ + |To Go types |From AMQP types | + +===========================+======================================================================+ + |bool |bool | + +---------------------------+----------------------------------------------------------------------+ + |int, int8, int16, |Equivalent or smaller signed integer type: byte, short, int, long. | + |int32, int64 | | + +---------------------------+----------------------------------------------------------------------+ + |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: ubyte, ushort, uint, | + |uint32, uint64 types |ulong | + +---------------------------+----------------------------------------------------------------------+ + |float32, float64 |Equivalent or smaller float or double. | + +---------------------------+----------------------------------------------------------------------+ + |string, []byte |string, symbol or binary. | + +---------------------------+----------------------------------------------------------------------+ + |Symbol |symbol | + +---------------------------+----------------------------------------------------------------------+ + |map[K]T |map, provided all keys and values can unmarshal to types K, T | + +---------------------------+----------------------------------------------------------------------+ + |Map |map, any AMQP map | + +---------------------------+----------------------------------------------------------------------+ + |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: | + | +------------------------+---------------------------------------------+ + | |AMQP Type |Go Type in interface{} | + | +========================+=============================================+ + | |bool |bool | + | +------------------------+---------------------------------------------+ + | |byte,short,int,long |int8,int16,int32,int64 | + | +------------------------+---------------------------------------------+ + | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 | + | +------------------------+---------------------------------------------+ + | |float, double |float32, float64 | + | +------------------------+---------------------------------------------+ + | |string |string | + | +------------------------+---------------------------------------------+ + | |symbol |Symbol | + | +------------------------+---------------------------------------------+ + | |binary |Binary | + | +------------------------+---------------------------------------------+ + | |nulll |nil | + | +------------------------+---------------------------------------------+ + | |map |Map | + | +------------------------+---------------------------------------------+ + | |list |List | + +---------------------------+------------------------+---------------------------------------------+ + +The following Go types cannot be unmarshaled: uintptr, function, interface, channel. + +TODO + +Go types: array, struct. + +AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies. + +AMQP maps with mixed/unhashable key types need an alternate representation. + +Described types. +*/ +func Unmarshal(bytes []byte, v interface{}) (n int, err error) { + defer doRecover(&err) + + data := C.pn_data(0) + defer C.pn_data_free(data) - n = decode(data, bytes) ++ n, err = decode(data, bytes) ++ if err != nil { ++ return 0, err ++ } + if n == 0 { - err = internal.Errorf("not enough data") ++ return 0, fmt.Errorf("not enough data") + } else { + unmarshal(v, data) + } - return ++ return n, nil +} + +// more reads more data when we can't parse a complete AMQP type +func (d *Decoder) more() error { + var readSize int64 = minDecode + if int64(d.buffer.Len()) > readSize { // Grow by doubling + readSize = int64(d.buffer.Len()) + } + var n int64 + n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize)) + if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0 + err = io.EOF + } + return err +} + +// Unmarshal from data into value pointed at by v. +func unmarshal(v interface{}, data *C.pn_data_t) { + pnType := C.pn_data_type(data) + switch v := v.(type) { + case *bool: + switch pnType { + case C.PN_BOOL: + *v = bool(C.pn_data_get_bool(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int8: + switch pnType { + case C.PN_CHAR: + *v = int8(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int8(C.pn_data_get_byte(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint8: + switch pnType { + case C.PN_CHAR: + *v = uint8(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint8(C.pn_data_get_ubyte(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int16: + switch pnType { + case C.PN_CHAR: + *v = int16(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int16(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int16(C.pn_data_get_short(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint16: + switch pnType { + case C.PN_CHAR: + *v = uint16(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint16(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint16(C.pn_data_get_ushort(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int32: + switch pnType { + case C.PN_CHAR: + *v = int32(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int32(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int32(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int32(C.pn_data_get_int(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint32: + switch pnType { + case C.PN_CHAR: + *v = uint32(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint32(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint32(C.pn_data_get_ushort(data)) + case C.PN_UINT: + *v = uint32(C.pn_data_get_uint(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *int64: + switch pnType { + case C.PN_CHAR: + *v = int64(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int64(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int64(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int64(C.pn_data_get_int(data)) + case C.PN_LONG: + *v = int64(C.pn_data_get_long(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *uint64: + switch pnType { + case C.PN_CHAR: + *v = uint64(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint64(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint64(C.pn_data_get_ushort(data)) + case C.PN_ULONG: + *v = uint64(C.pn_data_get_ulong(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *int: + switch pnType { + case C.PN_CHAR: + *v = int(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int(C.pn_data_get_int(data)) + case C.PN_LONG: + if unsafe.Sizeof(0) == 8 { + *v = int(C.pn_data_get_long(data)) + } else { + panic(newUnmarshalError(pnType, v)) + } + default: + panic(newUnmarshalError(pnType, v)) + } + + case *uint: + switch pnType { + case C.PN_CHAR: + *v = uint(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint(C.pn_data_get_ushort(data)) + case C.PN_UINT: + *v = uint(C.pn_data_get_uint(data)) + case C.PN_ULONG: + if unsafe.Sizeof(0) == 8 { + *v = uint(C.pn_data_get_ulong(data)) + } else { + panic(newUnmarshalError(pnType, v)) + } + default: + panic(newUnmarshalError(pnType, v)) + } + + case *float32: + switch pnType { + case C.PN_FLOAT: + *v = float32(C.pn_data_get_float(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *float64: + switch pnType { + case C.PN_FLOAT: + *v = float64(C.pn_data_get_float(data)) + case C.PN_DOUBLE: + *v = float64(C.pn_data_get_double(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *string: + switch pnType { + case C.PN_STRING: + *v = goString(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = goString(C.pn_data_get_symbol(data)) + case C.PN_BINARY: + *v = goString(C.pn_data_get_binary(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *[]byte: + switch pnType { + case C.PN_STRING: + *v = goBytes(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = goBytes(C.pn_data_get_symbol(data)) + case C.PN_BINARY: + *v = goBytes(C.pn_data_get_binary(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *Binary: + switch pnType { + case C.PN_BINARY: + *v = Binary(goBytes(C.pn_data_get_binary(data))) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *Symbol: + switch pnType { + case C.PN_SYMBOL: + *v = Symbol(goBytes(C.pn_data_get_symbol(data))) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *interface{}: + getInterface(data, v) + + default: + if reflect.TypeOf(v).Kind() != reflect.Ptr { + panic(newUnmarshalError(pnType, v)) + } + switch reflect.TypeOf(v).Elem().Kind() { + case reflect.Map: + getMap(data, v) + case reflect.Slice: + getList(data, v) + default: + panic(newUnmarshalError(pnType, v)) + } + } + err := dataError("unmarshaling", data) + if err != nil { + panic(err) + } + return +} + +func rewindUnmarshal(v interface{}, data *C.pn_data_t) { + C.pn_data_rewind(data) + C.pn_data_next(data) + unmarshal(v, data) +} + +// Getting into an interface is driven completely by the AMQP type, since the interface{} +// target is type-neutral. +func getInterface(data *C.pn_data_t, v *interface{}) { + pnType := C.pn_data_type(data) + switch pnType { - // Note PN_INVALID is defined outside the enum, older Go versions don't consider it a C.pn_type_t - case C.PN_NULL, C.pn_type_t(C.PN_INVALID): // No data. ++ case C.PN_NULL, C.pn_type_t(pnInvalid): // No data. + *v = nil + case C.PN_BOOL: + *v = bool(C.pn_data_get_bool(data)) + case C.PN_UBYTE: + *v = uint8(C.pn_data_get_ubyte(data)) + case C.PN_BYTE: + *v = int8(C.pn_data_get_byte(data)) + case C.PN_USHORT: + *v = uint16(C.pn_data_get_ushort(data)) + case C.PN_SHORT: + *v = int16(C.pn_data_get_short(data)) + case C.PN_UINT: + *v = uint32(C.pn_data_get_uint(data)) + case C.PN_INT: + *v = int32(C.pn_data_get_int(data)) + case C.PN_CHAR: + *v = uint8(C.pn_data_get_char(data)) + case C.PN_ULONG: + *v = uint64(C.pn_data_get_ulong(data)) + case C.PN_LONG: + *v = int64(C.pn_data_get_long(data)) + case C.PN_FLOAT: + *v = float32(C.pn_data_get_float(data)) + case C.PN_DOUBLE: + *v = float64(C.pn_data_get_double(data)) + case C.PN_BINARY: + *v = Binary(goBytes(C.pn_data_get_binary(data))) + case C.PN_STRING: + *v = goString(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = Symbol(goString(C.pn_data_get_symbol(data))) + case C.PN_MAP: + m := make(Map) + unmarshal(&m, data) + *v = m + case C.PN_LIST: + l := make(List, 0) + unmarshal(&l, data) + *v = l + default: + panic(newUnmarshalError(pnType, v)) + } +} + +// get into map pointed at by v +func getMap(data *C.pn_data_t, v interface{}) { + mapValue := reflect.ValueOf(v).Elem() + mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map + switch pnType := C.pn_data_type(data); pnType { + case C.PN_MAP: + count := int(C.pn_data_get_map(data)) + if bool(C.pn_data_enter(data)) { + defer C.pn_data_exit(data) + for i := 0; i < count/2; i++ { + if bool(C.pn_data_next(data)) { + key := reflect.New(mapValue.Type().Key()) + unmarshal(key.Interface(), data) + if bool(C.pn_data_next(data)) { + val := reflect.New(mapValue.Type().Elem()) + unmarshal(val.Interface(), data) + mapValue.SetMapIndex(key.Elem(), val.Elem()) + } + } + } + } - // Note PN_INVALID is defined outside the enum, older Go versions don't consider it a C.pn_type_t - case C.pn_type_t(C.PN_INVALID): // Leave the map empty ++ case C.pn_type_t(pnInvalid): // Leave the map empty + default: + panic(newUnmarshalError(pnType, v)) + } +} + +func getList(data *C.pn_data_t, v interface{}) { + pnType := C.pn_data_type(data) + if pnType != C.PN_LIST { + panic(newUnmarshalError(pnType, v)) + } + count := int(C.pn_data_get_list(data)) + listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count) + if bool(C.pn_data_enter(data)) { + for i := 0; i < count; i++ { + if bool(C.pn_data_next(data)) { + val := reflect.New(listValue.Type().Elem()) + unmarshal(val.Interface(), data) + listValue.Index(i).Set(val.Elem()) + } + } + C.pn_data_exit(data) + } + reflect.ValueOf(v).Elem().Set(listValue) +} + +// decode from bytes. +// Return bytes decoded or 0 if we could not decode a complete object. +// - func decode(data *C.pn_data_t, bytes []byte) int { ++func decode(data *C.pn_data_t, bytes []byte) (int, error) { + if len(bytes) == 0 { - return 0 ++ return 0, nil + } + n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes))) + if n == int(C.PN_UNDERFLOW) { + C.pn_error_clear(C.pn_data_error(data)) - return 0 ++ return 0, nil + } else if n <= 0 { - panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n))) ++ return 0, fmt.Errorf("unmarshal %s", PnErrorCode(n)) + } - return n ++ return n, nil +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/url.go ---------------------------------------------------------------------- diff --cc amqp/url.go index 0d0c662,0000000..70545d2 mode 100644,000000..100644 --- a/amqp/url.go +++ b/amqp/url.go @@@ -1,96 -1,0 +1,96 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +/* +#include <stdlib.h> +#include <string.h> +#include <proton/url.h> + +// Helper function for setting URL fields. +typedef void (*setter_fn)(pn_url_t* url, const char* value); +inline void set(pn_url_t *url, setter_fn s, const char* value) { + s(url, value); +} +*/ +import "C" + +import ( ++ "fmt" + "net" + "net/url" - "qpid.apache.org/internal" + "unsafe" +) + +const ( + amqp string = "amqp" + amqps = "amqps" +) + +// ParseUrl parses an AMQP URL string and returns a net/url.Url. +// +// It is more forgiving than net/url.Parse and allows most of the parts of the +// URL to be missing, assuming AMQP defaults. +// +func ParseURL(s string) (u *url.URL, err error) { + cstr := C.CString(s) + defer C.free(unsafe.Pointer(cstr)) + pnUrl := C.pn_url_parse(cstr) + if pnUrl == nil { - return nil, internal.Errorf("bad URL %#v", s) ++ return nil, fmt.Errorf("bad URL %#v", s) + } + defer C.pn_url_free(pnUrl) + + scheme := C.GoString(C.pn_url_get_scheme(pnUrl)) + username := C.GoString(C.pn_url_get_username(pnUrl)) + password := C.GoString(C.pn_url_get_password(pnUrl)) + host := C.GoString(C.pn_url_get_host(pnUrl)) + port := C.GoString(C.pn_url_get_port(pnUrl)) + path := C.GoString(C.pn_url_get_path(pnUrl)) + + if err != nil { - return nil, internal.Errorf("bad URL %#v: %s", s, err) ++ return nil, fmt.Errorf("bad URL %#v: %s", s, err) + } + if scheme == "" { + scheme = amqp + } + if port == "" { + if scheme == amqps { + port = amqps + } else { + port = amqp + } + } + var user *url.Userinfo + if password != "" { + user = url.UserPassword(username, password) + } else if username != "" { + user = url.User(username) + } + + u = &url.URL{ + Scheme: scheme, + User: user, + Host: net.JoinHostPort(host, port), + Path: path, + } + + return u, nil +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/url_test.go ---------------------------------------------------------------------- diff --cc amqp/url_test.go index f80f1c4,0000000..99b656d mode 100644,000000..100644 --- a/amqp/url_test.go +++ b/amqp/url_test.go @@@ -1,51 -1,0 +1,51 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +import ( + "fmt" +) + +func ExampleParseURL() { + for _, s := range []string{ + "amqp://username:password@host:1234/path", + "host:1234", + "host", + ":1234", + "host/path", + "amqps://host", + "", + } { + u, err := ParseURL(s) + if err != nil { + fmt.Println(err) + } else { + fmt.Println(u) + } + } + // Output: + // amqp://username:password@host:1234/path + // amqp://host:1234 + // amqp://host:amqp + // amqp://:1234 + // amqp://host:amqp/path + // amqps://host:amqps - // proton: bad URL "" ++ // bad URL "" +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/connection.go ---------------------------------------------------------------------- diff --cc electron/connection.go index d6761d6,0000000..8a9e6cd mode 100644,000000..100644 --- a/electron/connection.go +++ b/electron/connection.go @@@ -1,218 -1,0 +1,238 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +// #include <proton/disposition.h> +import "C" + +import ( + "net" - "qpid.apache.org/amqp" - "qpid.apache.org/internal" + "qpid.apache.org/proton" + "sync" + "time" +) + +// Connection is an AMQP connection, created by a Container. +type Connection interface { + Endpoint + + // Sender opens a new sender on the DefaultSession. - // - // v can be a string, which is used as the Target address, or a SenderSettings - // struct containing more details settings. - Sender(...LinkSetting) (Sender, error) ++ Sender(...LinkOption) (Sender, error) + + // Receiver opens a new Receiver on the DefaultSession(). - // - // v can be a string, which is used as the - // Source address, or a ReceiverSettings struct containing more details - // settings. - Receiver(...LinkSetting) (Receiver, error) ++ Receiver(...LinkOption) (Receiver, error) + + // DefaultSession() returns a default session for the connection. It is opened + // on the first call to DefaultSession and returned on subsequent calls. + DefaultSession() (Session, error) + + // Session opens a new session. - Session(...SessionSetting) (Session, error) ++ Session(...SessionOption) (Session, error) + + // Container for the connection. + Container() Container + + // Disconnect the connection abruptly with an error. + Disconnect(error) + + // Wait waits for the connection to be disconnected. + Wait() error + + // WaitTimeout is like Wait but returns Timeout if the timeout expires. + WaitTimeout(time.Duration) error ++ ++ // Incoming returns a channel for incoming endpoints opened by the remote end. ++ // ++ // To enable, pass AllowIncoming() when creating the Connection. Otherwise all ++ // incoming endpoint requests are automatically rejected and Incoming() ++ // returns nil. ++ // ++ // An Incoming value can be an *IncomingSession, *IncomingSender or ++ // *IncomingReceiver. You must call Accept() to open the endpoint or Reject() ++ // to close it with an error. The specific Incoming types have additional ++ // methods to configure the endpoint. ++ // ++ // Not receiving from Incoming() or not calling Accept/Reject will block the ++ // electron event loop. Normally you would have a dedicated goroutine receive ++ // from Incoming() and start new goroutines to serve each incoming endpoint. ++ // The channel is closed when the Connection closes. ++ // ++ Incoming() <-chan Incoming +} + - // ConnectionSetting can be passed when creating a connection. - // See functions that return ConnectionSetting for details - type ConnectionSetting func(*connection) ++// ConnectionOption can be passed when creating a connection to configure various options ++type ConnectionOption func(*connection) + - // Server setting puts the connection in server mode. ++// Server returns a ConnectionOption to put the connection in server mode. +// +// A server connection will do protocol negotiation to accept a incoming AMQP +// connection. Normally you would call this for a connection created by +// net.Listener.Accept() +// - func Server() ConnectionSetting { return func(c *connection) { c.engine.Server() } } ++func Server() ConnectionOption { return func(c *connection) { c.engine.Server() } } + - // Accepter provides a function to be called when a connection receives an incoming - // request to open an endpoint, one of IncomingSession, IncomingSender or IncomingReceiver. - // - // The accept() function must not block or use the accepted endpoint. - // It can pass the endpoint to another goroutine for processing. - // - // By default all incoming endpoints are rejected. - func Accepter(accept func(Incoming)) ConnectionSetting { - return func(c *connection) { c.accept = accept } ++// AllowIncoming returns a ConnectionOption to enable incoming endpoint open requests. ++// See Connection.Incoming() ++func AllowIncoming() ConnectionOption { ++ return func(c *connection) { c.incoming = make(chan Incoming) } +} + +type connection struct { + endpoint - listenOnce, defaultSessionOnce, closeOnce sync.Once ++ defaultSessionOnce, closeOnce sync.Once + + container *container + conn net.Conn - accept func(Incoming) ++ incoming chan Incoming + handler *handler + engine *proton.Engine - err internal.ErrorHolder + eConnection proton.Connection + + defaultSession Session - done chan struct{} +} + - func newConnection(conn net.Conn, cont *container, setting ...ConnectionSetting) (*connection, error) { - c := &connection{container: cont, conn: conn, accept: func(Incoming) {}, done: make(chan struct{})} ++func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) { ++ c := &connection{container: cont, conn: conn} + c.handler = newHandler(c) + var err error + c.engine, err = proton.NewEngine(c.conn, c.handler.delegator) + if err != nil { + return nil, err + } + for _, set := range setting { + set(c) + } - c.str = c.engine.String() ++ c.endpoint = makeEndpoint(c.engine.String()) + c.eConnection = c.engine.Connection() - go func() { c.engine.Run(); close(c.done) }() ++ go c.run() + return c, nil +} + ++func (c *connection) run() { ++ c.engine.Run() ++ if c.incoming != nil { ++ close(c.incoming) ++ } ++ c.closed(Closed) ++} ++ +func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) } + +func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) } + - func (c *connection) Session(setting ...SessionSetting) (Session, error) { ++func (c *connection) Session(setting ...SessionOption) (Session, error) { + var s Session + err := c.engine.InjectWait(func() error { ++ if c.Error() != nil { ++ return c.Error() ++ } + eSession, err := c.engine.Connection().Session() + if err == nil { + eSession.Open() + if err == nil { + s = newSession(c, eSession, setting...) + } + } + return err + }) + return s, err +} + +func (c *connection) Container() Container { return c.container } + +func (c *connection) DefaultSession() (s Session, err error) { + c.defaultSessionOnce.Do(func() { + c.defaultSession, err = c.Session() + }) + if err == nil { + err = c.Error() + } + return c.defaultSession, err +} + - func (c *connection) Sender(setting ...LinkSetting) (Sender, error) { ++func (c *connection) Sender(setting ...LinkOption) (Sender, error) { + if s, err := c.DefaultSession(); err == nil { + return s.Sender(setting...) + } else { + return nil, err + } +} + - func (c *connection) Receiver(setting ...LinkSetting) (Receiver, error) { ++func (c *connection) Receiver(setting ...LinkOption) (Receiver, error) { + if s, err := c.DefaultSession(); err == nil { + return s.Receiver(setting...) + } else { + return nil, err + } +} + +func (c *connection) Connection() Connection { return c } + +func (c *connection) Wait() error { return c.WaitTimeout(Forever) } +func (c *connection) WaitTimeout(timeout time.Duration) error { + _, err := timedReceive(c.done, timeout) + if err == Timeout { + return Timeout + } + return c.Error() +} + ++func (c *connection) Incoming() <-chan Incoming { return c.incoming } ++ +// Incoming is the interface for incoming requests to open an endpoint. +// Implementing types are IncomingSession, IncomingSender and IncomingReceiver. +type Incoming interface { - // Accept the endpoint with default settings. - // - // You must not use the returned endpoint in the accept() function that - // receives the Incoming value, but you can pass it to other goroutines. - // - // Implementing types provide type-specific Accept functions that take additional settings. ++ // Accept and open the endpoint. + Accept() Endpoint + + // Reject the endpoint with an error + Reject(error) + - error() error ++ // wait for and call the accept function, call in proton goroutine. ++ wait() error ++ pEndpoint() proton.Endpoint +} + +type incoming struct { - err error - accepted bool ++ endpoint proton.Endpoint ++ acceptCh chan func() error ++} ++ ++func makeIncoming(e proton.Endpoint) incoming { ++ return incoming{endpoint: e, acceptCh: make(chan func() error)} +} + - func (i *incoming) Reject(err error) { i.err = err } ++func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } } ++ ++// Call in proton goroutine, wait for and call the accept function fr ++func (in *incoming) wait() error { return (<-in.acceptCh)() } ++ ++func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint } + - func (i *incoming) error() error { - switch { - case i.err != nil: - return i.err - case !i.accepted: - return amqp.Errorf(amqp.NotAllowed, "remote open rejected") - default: ++// Called in app goroutine to send an accept function to proton and return the resulting endpoint. ++func (in *incoming) accept(f func() Endpoint) Endpoint { ++ done := make(chan Endpoint) ++ in.acceptCh <- func() error { ++ ep := f() ++ done <- ep + return nil + } ++ return <-done +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/container.go ---------------------------------------------------------------------- diff --cc electron/container.go index 7bbc4b0,0000000..b5ce6c0 mode 100644,000000..100644 --- a/electron/container.go +++ b/electron/container.go @@@ -1,71 -1,0 +1,77 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "net" - "qpid.apache.org/internal" ++ "qpid.apache.org/proton" ++ "strconv" ++ "sync/atomic" +) + +// Container is an AMQP container, it represents a single AMQP "application".It +// provides functions to create new Connections to remote containers. +// +// Create with NewContainer() +// +type Container interface { + // Id is a unique identifier for the container in your distributed application. + Id() string + + // Create a new AMQP Connection over the supplied net.Conn connection. + // + // You must call Connection.Open() on the returned Connection, after + // setting any Connection properties you need to set. Note the net.Conn + // can be an outgoing connection (e.g. made with net.Dial) or an incoming + // connection (e.g. made with net.Listener.Accept()) - Connection(net.Conn, ...ConnectionSetting) (Connection, error) ++ Connection(net.Conn, ...ConnectionOption) (Connection, error) +} + +type container struct { - id string - linkNames internal.IdCounter ++ id string ++ tagCounter uint64 ++} ++ ++func (cont *container) nextTag() string { ++ return strconv.FormatUint(atomic.AddUint64(&cont.tagCounter, 1), 32) +} + +// NewContainer creates a new container. The id must be unique in your +// distributed application, all connections created by the container +// will have this container-id. +// +// If id == "" a random UUID will be generated for the id. +func NewContainer(id string) Container { + if id == "" { - id = internal.UUID4().String() ++ id = proton.UUID4().String() + } + cont := &container{id: id} + return cont +} + +func (cont *container) Id() string { return cont.id } + +func (cont *container) nextLinkName() string { - return cont.id + "@" + cont.linkNames.Next() ++ return cont.id + "@" + cont.nextTag() +} + - func (cont *container) Connection(conn net.Conn, setting ...ConnectionSetting) (Connection, error) { ++func (cont *container) Connection(conn net.Conn, setting ...ConnectionOption) (Connection, error) { + return newConnection(conn, cont, setting...) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/doc.go ---------------------------------------------------------------------- diff --cc electron/doc.go index eaa6e7a,0000000..46bde37 mode 100644,000000..100644 --- a/electron/doc.go +++ b/electron/doc.go @@@ -1,57 -1,0 +1,63 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +/* +Package electron is a procedural, concurrent-safe Go library for AMQP messaging. +You can write clients and servers using this library. + +Start by creating a Container with NewContainer. A Container represents a client +or server application that can contain many incoming or outgoing connections. + +Create connections with the standard Go 'net' package using net.Dial or +net.Listen. Create an AMQP connection over a net.Conn with +Container.Connection() and open it with Connection.Open(). + +AMQP sends messages over "links". Each link has a Sender end and a Receiver +end. Connection.Sender() and Connection.Receiver() allow you to create links to +Send() and Receive() messages. + +You can create an AMQP server connection by calling Connection.Server() and +Connection.Listen() before calling Connection.Open(). A server connection can +negotiate protocol security details and can accept incoming links opened from - the remote end of the connection ++the remote end of the connection. ++ +*/ +package electron + +//#cgo LDFLAGS: -lqpid-proton +import "C" + +// Just for package comment + +/* DEVELOPER NOTES + +There is a single proton.Engine per connection, each driving it's own event-loop goroutine, +and each with a 'handler'. Most state for a connection is maintained on the handler, and - only accessed in the event-loop goroutine, so no locks are required. ++only accessed in the event-loop goroutine, so no locks are required there. + +The handler sets up channels as needed to get or send data from user goroutines - using electron types like Sender or Receiver. We also use Engine.Inject to inject - actions into the event loop from user goroutines. ++using electron types like Sender or Receiver. ++ ++We also use Engine.Inject to inject actions into the event loop from user ++goroutines. It is important to check at the start of an injected function that ++required objects are still valid, for example a link may be remotely closed ++between the time a Sender function calls Inject and the time the injected ++function is execute by the handler goroutine. See comments in endpoint.go for more. + +*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/endpoint.go ---------------------------------------------------------------------- diff --cc electron/endpoint.go index 745fd04,0000000..8cbeadb mode 100644,000000..100644 --- a/electron/endpoint.go +++ b/electron/endpoint.go @@@ -1,68 -1,0 +1,94 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "io" - "qpid.apache.org/internal" + "qpid.apache.org/proton" +) + +// Closed is an alias for io.EOF. It is returned as an error when an endpoint +// was closed cleanly. +var Closed = io.EOF + +// Endpoint is the common interface for Connection, Session, Link, Sender and Receiver. +// +// Endpoints can be created locally or by the remote peer. You must Open() an +// endpoint before you can use it. Some endpoints have additional Set*() methods +// that must be called before Open() to take effect, see Connection, Session, +// Link, Sender and Receiver for details. +// +type Endpoint interface { + // Close an endpoint and signal an error to the remote end if error != nil. + Close(error) + + // String is a human readable identifier, useful for debugging and logging. + String() string + + // Error returns nil if the endpoint is open, otherwise returns an error. + // Error() == Closed means the endpoint was closed without error. + Error() error + + // Connection containing the endpoint + Connection() Connection ++ ++ // Done returns a channel that will close when the endpoint closes. ++ // Error() will contain the reason. ++ Done() <-chan struct{} +} + ++// DEVELOPER NOTES ++// ++// An electron.Endpoint corresponds to a proton.Endpoint, which can be invalidated ++// +type endpoint struct { - err internal.ErrorHolder - str string // Must be set by the value that embeds endpoint. ++ err proton.ErrorHolder ++ str string // Must be set by the value that embeds endpoint. ++ done chan struct{} ++} ++ ++func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} } ++ ++// Called in handler on a Closed event. Marks the endpoint as closed and the corresponding ++// proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure ++// the pointer has not been invalidated. ++// ++// Returns the error stored on the endpoint, which may not be different to err if there was ++// already a n error ++func (e *endpoint) closed(err error) error { ++ e.err.Set(err) ++ e.err.Set(Closed) ++ close(e.done) ++ return e.err.Get() +} + +func (e *endpoint) String() string { return e.str } - func (e *endpoint) Error() error { return e.err.Get() } + - // Call in proton goroutine to close an endpoint locally ++func (e *endpoint) Error() error { return e.err.Get() } ++ ++func (e *endpoint) Done() <-chan struct{} { return e.done } ++ ++// Call in proton goroutine to initiate closing an endpoint locally +// handler will complete the close when remote end closes. +func localClose(ep proton.Endpoint, err error) { + if ep.State().LocalActive() { + proton.CloseError(ep, err) + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/error.go ---------------------------------------------------------------------- diff --cc electron/error.go index 0000000,0000000..4dcfd94 new file mode 100644 --- /dev/null +++ b/electron/error.go @@@ -1,0 -1,0 +1,35 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++or more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++package electron ++ ++import ( ++ "fmt" ++) ++ ++// assert panics if condition is false with optional formatted message ++func assert(condition bool, format ...interface{}) { ++ if !condition { ++ if len(format) > 0 { ++ panic(fmt.Errorf(format[0].(string), format[1:]...)) ++ } else { ++ panic(fmt.Errorf("assertion failed")) ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/handler.go ---------------------------------------------------------------------- diff --cc electron/handler.go index b518e42,0000000..0237156 mode 100644,000000..100644 --- a/electron/handler.go +++ b/electron/handler.go @@@ -1,158 -1,0 +1,187 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "qpid.apache.org/amqp" + "qpid.apache.org/proton" +) + +// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated. + +type handler struct { + delegator *proton.MessagingAdapter + connection *connection + links map[proton.Link]Link - sentMessages map[proton.Delivery]*sentMessage ++ sentMessages map[proton.Delivery]sentMessage + sessions map[proton.Session]*session +} + +func newHandler(c *connection) *handler { + h := &handler{ + connection: c, + links: make(map[proton.Link]Link), - sentMessages: make(map[proton.Delivery]*sentMessage), ++ sentMessages: make(map[proton.Delivery]sentMessage), + sessions: make(map[proton.Session]*session), + } + h.delegator = proton.NewMessagingAdapter(h) + // Disable auto features of MessagingAdapter, we do these ourselves. + h.delegator.Prefetch = 0 + h.delegator.AutoAccept = false + h.delegator.AutoSettle = false + h.delegator.AutoOpen = false + return h +} + - func (h *handler) internalError(fmt string, arg ...interface{}) { - proton.CloseError(h.connection.eConnection, amqp.Errorf(amqp.InternalError, fmt, arg...)) ++func (h *handler) linkError(l proton.Link, msg string) { ++ proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l)) +} + +func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) { + switch t { + + case proton.MMessage: + if r, ok := h.links[e.Link()].(*receiver); ok { + r.message(e.Delivery()) + } else { - h.internalError("no receiver for link %s", e.Link()) ++ h.linkError(e.Link(), "no receiver") + } + + case proton.MSettled: - if sm := h.sentMessages[e.Delivery()]; sm != nil { - sm.settled(nil) ++ if sm, ok := h.sentMessages[e.Delivery()]; ok { ++ d := e.Delivery().Remote() ++ sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value} ++ delete(h.sentMessages, e.Delivery()) + } + + case proton.MSendable: + if s, ok := h.links[e.Link()].(*sender); ok { + s.sendable() + } else { - h.internalError("no receiver for link %s", e.Link()) ++ h.linkError(e.Link(), "no sender") + } + + case proton.MSessionOpening: + if e.Session().State().LocalUninit() { // Remotely opened - incoming := &IncomingSession{h: h, pSession: e.Session()} - h.connection.accept(incoming) - if err := incoming.error(); err != nil { - proton.CloseError(e.Session(), err) - } ++ h.incoming(newIncomingSession(h, e.Session())) + } + + case proton.MSessionClosed: - err := proton.EndpointError(e.Session()) - for l, _ := range h.links { - if l.Session() == e.Session() { - h.linkClosed(l, err) - } - } - delete(h.sessions, e.Session()) ++ h.sessionClosed(e.Session(), proton.EndpointError(e.Session())) + + case proton.MLinkOpening: + l := e.Link() + if l.State().LocalActive() { // Already opened locally. + break + } + ss := h.sessions[l.Session()] + if ss == nil { - h.internalError("no session for link %s", e.Link()) ++ h.linkError(e.Link(), "no session") + break + } - var incoming Incoming + if l.IsReceiver() { - incoming = &IncomingReceiver{makeIncomingLink(ss, l)} ++ h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)}) + } else { - incoming = &IncomingSender{makeIncomingLink(ss, l)} - } - h.connection.accept(incoming) - if err := incoming.error(); err != nil { - proton.CloseError(l, err) - break ++ h.incoming(&IncomingSender{makeIncomingLink(ss, l)}) + } + + case proton.MLinkClosing: + e.Link().Close() + + case proton.MLinkClosed: + h.linkClosed(e.Link(), proton.EndpointError(e.Link())) + + case proton.MConnectionClosing: + h.connection.err.Set(e.Connection().RemoteCondition().Error()) + + case proton.MConnectionClosed: - h.connection.err.Set(Closed) // If no error already set, this is an orderly close. ++ h.connectionClosed(proton.EndpointError(e.Connection())) + + case proton.MDisconnected: + h.connection.err.Set(e.Transport().Condition().Error()) + // If err not set at this point (e.g. to Closed) then this is unexpected. + h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection)) + + err := h.connection.Error() ++ + for l, _ := range h.links { + h.linkClosed(l, err) + } ++ h.links = nil + for _, s := range h.sessions { + s.closed(err) + } ++ h.sessions = nil + for _, sm := range h.sentMessages { - sm.settled(err) ++ sm.ack <- Outcome{Unacknowledged, err, sm.value} + } ++ h.sentMessages = nil ++ } ++} ++ ++func (h *handler) incoming(in Incoming) { ++ var err error ++ if h.connection.incoming != nil { ++ h.connection.incoming <- in ++ err = in.wait() ++ } else { ++ err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s", ++ in.pEndpoint().Type(), in.pEndpoint().String()) ++ } ++ if err == nil { ++ in.pEndpoint().Open() ++ } else { ++ proton.CloseError(in.pEndpoint(), err) + } +} + ++func (h *handler) addLink(pl proton.Link, el Link) { ++ h.links[pl] = el ++} ++ +func (h *handler) linkClosed(l proton.Link, err error) { - if link := h.links[l]; link != nil { ++ if link, ok := h.links[l]; ok { + link.closed(err) + delete(h.links, l) + } +} + - func (h *handler) addLink(rl proton.Link, ll Link) { - h.links[rl] = ll ++func (h *handler) sessionClosed(ps proton.Session, err error) { ++ if s, ok := h.sessions[ps]; ok { ++ delete(h.sessions, ps) ++ err = s.closed(err) ++ for l, _ := range h.links { ++ if l.Session() == ps { ++ h.linkClosed(l, err) ++ } ++ } ++ } ++} ++ ++func (h *handler) connectionClosed(err error) { ++ err = h.connection.closed(err) ++ // Close links first to avoid repeated scans of the link list by sessions. ++ for l, _ := range h.links { ++ h.linkClosed(l, err) ++ } ++ for s, _ := range h.sessions { ++ h.sessionClosed(s, err) ++ } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
