PROTON-1356: go: restore compatibility with proton-c from 0.10 Remove use of PN_INVALID (not available in proton 0.10) Link with libqpid-proton not libqpid-proton-core (not available until 0.16) Catch up with master branch.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0889192a Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0889192a Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0889192a Branch: refs/heads/go1 Commit: 0889192a326c6a22973f8eb4fd07353d94bbe97f Parents: d5c2470 a34c93b Author: Alan Conway <[email protected]> Authored: Fri Nov 25 15:55:52 2016 -0500 Committer: Alan Conway <[email protected]> Committed: Fri Nov 25 15:56:45 2016 -0500 ---------------------------------------------------------------------- amqp/types.go | 4 +--- amqp/unmarshal.go | 10 +++------- 2 files changed, 4 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0889192a/amqp/types.go ---------------------------------------------------------------------- diff --cc amqp/types.go index bc0859a,0000000..2852c23 mode 100644,000000..100644 --- a/amqp/types.go +++ b/amqp/types.go @@@ -1,196 -1,0 +1,194 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +import "C" + +import ( + "bytes" + "fmt" + "reflect" + "time" + "unsafe" +) + +func (t C.pn_type_t) String() string { + switch C.pn_type_t(t) { + case C.PN_NULL: + return "null" + case C.PN_BOOL: + return "bool" + case C.PN_UBYTE: + return "ubyte" + case C.PN_BYTE: + return "byte" + case C.PN_USHORT: + return "ushort" + case C.PN_SHORT: + return "short" + case C.PN_CHAR: + return "char" + case C.PN_UINT: + return "uint" + case C.PN_INT: + return "int" + case C.PN_ULONG: + return "ulong" + case C.PN_LONG: + return "long" + case C.PN_TIMESTAMP: + return "timestamp" + case C.PN_FLOAT: + return "float" + case C.PN_DOUBLE: + return "double" + case C.PN_DECIMAL32: + return "decimal32" + case C.PN_DECIMAL64: + return "decimal64" + case C.PN_DECIMAL128: + return "decimal128" + case C.PN_UUID: + return "uuid" + case C.PN_BINARY: + return "binary" + case C.PN_STRING: + return "string" + case C.PN_SYMBOL: + return "symbol" + case C.PN_DESCRIBED: + return "described" + case C.PN_ARRAY: + return "array" + case C.PN_LIST: + return "list" + case C.PN_MAP: + return "map" - case C.PN_INVALID: - return "no-data" + default: - return fmt.Sprintf("unknown-type(%d)", t) ++ return "no-data" + } +} + +// Go types +var ( + bytesType = reflect.TypeOf([]byte{}) + valueType = reflect.TypeOf(reflect.Value{}) +) + +// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys. + +// Map is a generic map that can have mixed key and value types and so can represent any AMQP map +type Map map[interface{}]interface{} + +// List is a generic list that can hold mixed values and can represent any AMQP list. +// +type List []interface{} + +// Symbol is a string that is encoded as an AMQP symbol +type Symbol string + +func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) } + +// Binary is a string that is encoded as an AMQP binary. +// It is a string rather than a byte[] because byte[] is not hashable and can't be used as +// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte +type Binary string + +func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) } + +// GoString for Map prints values with their types, useful for debugging. +func (m Map) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", m) + i := len(m) + for k, v := range m { + fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v) + i-- + if i > 0 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// GoString for List prints values with their types, useful for debugging. +func (l List) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", l) + for i := 0; i < len(l); i++ { + fmt.Fprintf(out, "%T(%#v)", l[i], l[i]) + if i == len(l)-1 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// pnTime converts Go time.Time to Proton millisecond Unix time. +func pnTime(t time.Time) C.pn_timestamp_t { + secs := t.Unix() + // Note: sub-second accuracy is not guaraunteed if the Unix time in + // nanoseconds cannot be represented by an int64 (sometime around year 2260) + msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) + return C.pn_timestamp_t(secs*1000 + msecs) +} + +// goTime converts a pn_timestamp_t to a Go time.Time. +func goTime(t C.pn_timestamp_t) time.Time { + secs := int64(t) / 1000 + nsecs := (int64(t) % 1000) * int64(time.Millisecond) + return time.Unix(secs, nsecs) +} + +func goBytes(cBytes C.pn_bytes_t) (bytes []byte) { + if cBytes.start != nil { + bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size)) + } + return +} + +func goString(cBytes C.pn_bytes_t) (str string) { + if cBytes.start != nil { + str = C.GoStringN(cBytes.start, C.int(cBytes.size)) + } + return +} + +func pnBytes(b []byte) C.pn_bytes_t { + if len(b) == 0 { + return C.pn_bytes_t{0, nil} + } else { + return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))} + } +} + +func cPtr(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} + +func cLen(b []byte) C.size_t { + return C.size_t(len(b)) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0889192a/amqp/unmarshal.go ---------------------------------------------------------------------- diff --cc amqp/unmarshal.go index 95d4343,0000000..8f380a7 mode 100644,000000..100644 --- a/amqp/unmarshal.go +++ b/amqp/unmarshal.go @@@ -1,561 -1,0 +1,557 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +oor more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +import "C" + +import ( + "bytes" + "fmt" + "io" + "reflect" + "unsafe" +) + +const minDecode = 1024 + +// Error returned if AMQP data cannot be unmarshaled as the desired Go type. +type UnmarshalError struct { + // The name of the AMQP type. + AMQPType string + // The Go type. + GoType reflect.Type +} + +func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError { + return &UnmarshalError{C.pn_type_t(pnType).String(), reflect.TypeOf(v)} +} + +func (e UnmarshalError) Error() string { + if e.GoType.Kind() != reflect.Ptr { + return fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType) + } else { + return fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) + } +} + +func doRecover(err *error) { + r := recover() + switch r := r.(type) { + case nil: + case *UnmarshalError: + *err = r + default: + panic(r) + } +} + +// +// Decoding from a pn_data_t +// +// NOTE: we use panic() to signal a decoding error, simplifies decoding logic. +// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode. +// + +// Decoder decodes AMQP values from an io.Reader. +// +type Decoder struct { + reader io.Reader + buffer bytes.Buffer +} + +// NewDecoder returns a new decoder that reads from r. +// +// The decoder has it's own buffer and may read more data than required for the +// AMQP values requested. Use Buffered to see if there is data left in the +// buffer. +// +func NewDecoder(r io.Reader) *Decoder { + return &Decoder{r, bytes.Buffer{}} +} + +// Buffered returns a reader of the data remaining in the Decoder's buffer. The +// reader is valid until the next call to Decode. +// +func (d *Decoder) Buffered() io.Reader { + return bytes.NewReader(d.buffer.Bytes()) +} + +// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v. +// +// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value. +// +func (d *Decoder) Decode(v interface{}) (err error) { + defer doRecover(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) + var n int + for n == 0 { + n, err = decode(data, d.buffer.Bytes()) + if err != nil { + return err + } + if n == 0 { // n == 0 means not enough data, read more + err = d.more() + } else { + unmarshal(v, data) + } + } + d.buffer.Next(n) + return +} + +/* +Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v. +Types are converted as follows: + + +---------------------------+----------------------------------------------------------------------+ + |To Go types |From AMQP types | + +===========================+======================================================================+ + |bool |bool | + +---------------------------+----------------------------------------------------------------------+ + |int, int8, int16, |Equivalent or smaller signed integer type: byte, short, int, long. | + |int32, int64 | | + +---------------------------+----------------------------------------------------------------------+ + |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: ubyte, ushort, uint, | + |uint32, uint64 types |ulong | + +---------------------------+----------------------------------------------------------------------+ + |float32, float64 |Equivalent or smaller float or double. | + +---------------------------+----------------------------------------------------------------------+ + |string, []byte |string, symbol or binary. | + +---------------------------+----------------------------------------------------------------------+ + |Symbol |symbol | + +---------------------------+----------------------------------------------------------------------+ + |map[K]T |map, provided all keys and values can unmarshal to types K, T | + +---------------------------+----------------------------------------------------------------------+ + |Map |map, any AMQP map | + +---------------------------+----------------------------------------------------------------------+ + |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: | + | +------------------------+---------------------------------------------+ + | |AMQP Type |Go Type in interface{} | + | +========================+=============================================+ + | |bool |bool | + | +------------------------+---------------------------------------------+ + | |byte,short,int,long |int8,int16,int32,int64 | + | +------------------------+---------------------------------------------+ + | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 | + | +------------------------+---------------------------------------------+ + | |float, double |float32, float64 | + | +------------------------+---------------------------------------------+ + | |string |string | + | +------------------------+---------------------------------------------+ + | |symbol |Symbol | + | +------------------------+---------------------------------------------+ + | |binary |Binary | + | +------------------------+---------------------------------------------+ + | |nulll |nil | + | +------------------------+---------------------------------------------+ + | |map |Map | + | +------------------------+---------------------------------------------+ + | |list |List | + +---------------------------+------------------------+---------------------------------------------+ + +The following Go types cannot be unmarshaled: uintptr, function, interface, channel. + +TODO + +Go types: array, struct. + +AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies. + +AMQP maps with mixed/unhashable key types need an alternate representation. + +Described types. +*/ +func Unmarshal(bytes []byte, v interface{}) (n int, err error) { + defer doRecover(&err) + + data := C.pn_data(0) + defer C.pn_data_free(data) + n, err = decode(data, bytes) + if err != nil { + return 0, err + } + if n == 0 { + return 0, fmt.Errorf("not enough data") + } else { + unmarshal(v, data) + } + return n, nil +} + +// more reads more data when we can't parse a complete AMQP type +func (d *Decoder) more() error { + var readSize int64 = minDecode + if int64(d.buffer.Len()) > readSize { // Grow by doubling + readSize = int64(d.buffer.Len()) + } + var n int64 + n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize)) + if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0 + err = io.EOF + } + return err +} + +// Unmarshal from data into value pointed at by v. +func unmarshal(v interface{}, data *C.pn_data_t) { + pnType := C.pn_data_type(data) + switch v := v.(type) { + case *bool: + switch pnType { + case C.PN_BOOL: + *v = bool(C.pn_data_get_bool(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int8: + switch pnType { + case C.PN_CHAR: + *v = int8(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int8(C.pn_data_get_byte(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint8: + switch pnType { + case C.PN_CHAR: + *v = uint8(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint8(C.pn_data_get_ubyte(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int16: + switch pnType { + case C.PN_CHAR: + *v = int16(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int16(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int16(C.pn_data_get_short(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint16: + switch pnType { + case C.PN_CHAR: + *v = uint16(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint16(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint16(C.pn_data_get_ushort(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int32: + switch pnType { + case C.PN_CHAR: + *v = int32(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int32(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int32(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int32(C.pn_data_get_int(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint32: + switch pnType { + case C.PN_CHAR: + *v = uint32(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint32(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint32(C.pn_data_get_ushort(data)) + case C.PN_UINT: + *v = uint32(C.pn_data_get_uint(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *int64: + switch pnType { + case C.PN_CHAR: + *v = int64(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int64(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int64(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int64(C.pn_data_get_int(data)) + case C.PN_LONG: + *v = int64(C.pn_data_get_long(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *uint64: + switch pnType { + case C.PN_CHAR: + *v = uint64(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint64(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint64(C.pn_data_get_ushort(data)) + case C.PN_ULONG: + *v = uint64(C.pn_data_get_ulong(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *int: + switch pnType { + case C.PN_CHAR: + *v = int(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int(C.pn_data_get_int(data)) + case C.PN_LONG: + if unsafe.Sizeof(0) == 8 { + *v = int(C.pn_data_get_long(data)) + } else { + panic(newUnmarshalError(pnType, v)) + } + default: + panic(newUnmarshalError(pnType, v)) + } + + case *uint: + switch pnType { + case C.PN_CHAR: + *v = uint(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint(C.pn_data_get_ushort(data)) + case C.PN_UINT: + *v = uint(C.pn_data_get_uint(data)) + case C.PN_ULONG: + if unsafe.Sizeof(0) == 8 { + *v = uint(C.pn_data_get_ulong(data)) + } else { + panic(newUnmarshalError(pnType, v)) + } + default: + panic(newUnmarshalError(pnType, v)) + } + + case *float32: + switch pnType { + case C.PN_FLOAT: + *v = float32(C.pn_data_get_float(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *float64: + switch pnType { + case C.PN_FLOAT: + *v = float64(C.pn_data_get_float(data)) + case C.PN_DOUBLE: + *v = float64(C.pn_data_get_double(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *string: + switch pnType { + case C.PN_STRING: + *v = goString(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = goString(C.pn_data_get_symbol(data)) + case C.PN_BINARY: + *v = goString(C.pn_data_get_binary(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *[]byte: + switch pnType { + case C.PN_STRING: + *v = goBytes(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = goBytes(C.pn_data_get_symbol(data)) + case C.PN_BINARY: + *v = goBytes(C.pn_data_get_binary(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *Binary: + switch pnType { + case C.PN_BINARY: + *v = Binary(goBytes(C.pn_data_get_binary(data))) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *Symbol: + switch pnType { + case C.PN_SYMBOL: + *v = Symbol(goBytes(C.pn_data_get_symbol(data))) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *interface{}: + getInterface(data, v) + + default: + if reflect.TypeOf(v).Kind() != reflect.Ptr { + panic(newUnmarshalError(pnType, v)) + } + switch reflect.TypeOf(v).Elem().Kind() { + case reflect.Map: + getMap(data, v) + case reflect.Slice: + getList(data, v) + default: + panic(newUnmarshalError(pnType, v)) + } + } + err := dataError("unmarshaling", data) + if err != nil { + panic(err) + } + return +} + +func rewindUnmarshal(v interface{}, data *C.pn_data_t) { + C.pn_data_rewind(data) + C.pn_data_next(data) + unmarshal(v, data) +} + +// Getting into an interface is driven completely by the AMQP type, since the interface{} +// target is type-neutral. +func getInterface(data *C.pn_data_t, v *interface{}) { + pnType := C.pn_data_type(data) + switch pnType { - case C.PN_NULL, C.PN_INVALID: // No data. - *v = nil + case C.PN_BOOL: + *v = bool(C.pn_data_get_bool(data)) + case C.PN_UBYTE: + *v = uint8(C.pn_data_get_ubyte(data)) + case C.PN_BYTE: + *v = int8(C.pn_data_get_byte(data)) + case C.PN_USHORT: + *v = uint16(C.pn_data_get_ushort(data)) + case C.PN_SHORT: + *v = int16(C.pn_data_get_short(data)) + case C.PN_UINT: + *v = uint32(C.pn_data_get_uint(data)) + case C.PN_INT: + *v = int32(C.pn_data_get_int(data)) + case C.PN_CHAR: + *v = uint8(C.pn_data_get_char(data)) + case C.PN_ULONG: + *v = uint64(C.pn_data_get_ulong(data)) + case C.PN_LONG: + *v = int64(C.pn_data_get_long(data)) + case C.PN_FLOAT: + *v = float32(C.pn_data_get_float(data)) + case C.PN_DOUBLE: + *v = float64(C.pn_data_get_double(data)) + case C.PN_BINARY: + *v = Binary(goBytes(C.pn_data_get_binary(data))) + case C.PN_STRING: + *v = goString(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = Symbol(goString(C.pn_data_get_symbol(data))) + case C.PN_MAP: + m := make(Map) + unmarshal(&m, data) + *v = m + case C.PN_LIST: + l := make(List, 0) + unmarshal(&l, data) + *v = l - default: - panic(newUnmarshalError(pnType, v)) ++ default: // No data (-1 or NULL) ++ *v = nil + } +} + +// get into map pointed at by v +func getMap(data *C.pn_data_t, v interface{}) { + mapValue := reflect.ValueOf(v).Elem() + mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map + switch pnType := C.pn_data_type(data); pnType { + case C.PN_MAP: + count := int(C.pn_data_get_map(data)) + if bool(C.pn_data_enter(data)) { + defer C.pn_data_exit(data) + for i := 0; i < count/2; i++ { + if bool(C.pn_data_next(data)) { + key := reflect.New(mapValue.Type().Key()) + unmarshal(key.Interface(), data) + if bool(C.pn_data_next(data)) { + val := reflect.New(mapValue.Type().Elem()) + unmarshal(val.Interface(), data) + mapValue.SetMapIndex(key.Elem(), val.Elem()) + } + } + } + } - case C.PN_INVALID: // Leave the map empty - default: - panic(newUnmarshalError(pnType, v)) ++ default: // Empty/error/unknown, leave map empty + } +} + +func getList(data *C.pn_data_t, v interface{}) { + pnType := C.pn_data_type(data) + if pnType != C.PN_LIST { + panic(newUnmarshalError(pnType, v)) + } + count := int(C.pn_data_get_list(data)) + listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count) + if bool(C.pn_data_enter(data)) { + for i := 0; i < count; i++ { + if bool(C.pn_data_next(data)) { + val := reflect.New(listValue.Type().Elem()) + unmarshal(val.Interface(), data) + listValue.Index(i).Set(val.Elem()) + } + } + C.pn_data_exit(data) + } + reflect.ValueOf(v).Elem().Set(listValue) +} + +// decode from bytes. +// Return bytes decoded or 0 if we could not decode a complete object. +// +func decode(data *C.pn_data_t, bytes []byte) (int, error) { + if len(bytes) == 0 { + return 0, nil + } + n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes))) + if n == int(C.PN_UNDERFLOW) { + C.pn_error_clear(C.pn_data_error(data)) + return 0, nil + } else if n <= 0 { + return 0, fmt.Errorf("unmarshal %s", PnErrorCode(n)) + } + return n, nil +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
