http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/types.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/types.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/types.go deleted file mode 100644 index 8713520..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/types.go +++ /dev/null @@ -1,193 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include <proton/codec.h> -// const pn_type_t PN_DATA_TYPE_ERROR = (pn_type_t) -1; -import "C" - -import ( - "bytes" - "fmt" - "reflect" - "time" - "unsafe" -) - -func pnTypeString(pt C.pn_type_t) string { - switch pt { - case C.PN_NULL: - return "null" - case C.PN_BOOL: - return "bool" - case C.PN_UBYTE: - return "ubyte" - case C.PN_BYTE: - return "byte" - case C.PN_USHORT: - return "ushort" - case C.PN_SHORT: - return "short" - case C.PN_CHAR: - return "char" - case C.PN_UINT: - return "uint" - case C.PN_INT: - return "int" - case C.PN_ULONG: - return "ulong" - case C.PN_LONG: - return "long" - case C.PN_TIMESTAMP: - return "timestamp" - case C.PN_FLOAT: - return "float" - case C.PN_DOUBLE: - return "double" - case C.PN_DECIMAL32: - return "decimal32" - case C.PN_DECIMAL64: - return "decimal64" - case C.PN_DECIMAL128: - return "decimal128" - case C.PN_UUID: - return "uuid" - case C.PN_BINARY: - return "binary" - case C.PN_STRING: - return "string" - case C.PN_SYMBOL: - return "symbol" - case C.PN_DESCRIBED: - return "described" - case C.PN_ARRAY: - return "array" - case C.PN_LIST: - return "list" - case C.PN_MAP: - return "map" - case C.PN_DATA_TYPE_ERROR: - return "no-data" - default: - return fmt.Sprintf("unknown-type(%d)", pt) - } -} - -// Go types -var ( - bytesType = reflect.TypeOf([]byte{}) - valueType = reflect.TypeOf(reflect.Value{}) -) - -// FIXME aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys. - -// Map is a generic map that can have mixed key and value types and so can represent any AMQP map -type Map map[interface{}]interface{} - -// List is a generic list that can hold mixed values and can represent any AMQP list. -// -type List []interface{} - -// Symbol is a string that is encoded as an AMQP symbol -type Symbol string - -// Binary is a string that is encoded as an AMQP binary. -// It is a string rather than a byte[] because byte[] is not hashable and can't be used as -// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte -type Binary string - -// GoString for Map prints values with their types, useful for debugging. -func (m Map) GoString() string { - out := &bytes.Buffer{} - fmt.Fprintf(out, "%T{", m) - i := len(m) - for k, v := range m { - fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v) - i-- - if i > 0 { - fmt.Fprint(out, ", ") - } - } - fmt.Fprint(out, "}") - return out.String() -} - -// GoString for List prints values with their types, useful for debugging. -func (l List) GoString() string { - out := &bytes.Buffer{} - fmt.Fprintf(out, "%T{", l) - for i := 0; i < len(l); i++ { - fmt.Fprintf(out, "%T(%#v)", l[i], l[i]) - if i == len(l)-1 { - fmt.Fprint(out, ", ") - } - } - fmt.Fprint(out, "}") - return out.String() -} - -// pnTime converts Go time.Time to Proton millisecond Unix time. -func pnTime(t time.Time) C.pn_timestamp_t { - secs := t.Unix() - // Note: sub-second accuracy is not guaraunteed if the Unix time in - // nanoseconds cannot be represented by an int64 (sometime around year 2260) - msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) - return C.pn_timestamp_t(secs*1000 + msecs) -} - -// goTime converts a pn_timestamp_t to a Go time.Time. -func goTime(t C.pn_timestamp_t) time.Time { - secs := int64(t) / 1000 - nsecs := (int64(t) % 1000) * int64(time.Millisecond) - return time.Unix(secs, nsecs) -} - -func goBytes(cBytes C.pn_bytes_t) (bytes []byte) { - if cBytes.start != nil { - bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size)) - } - return -} - -func goString(cBytes C.pn_bytes_t) (str string) { - if cBytes.start != nil { - str = C.GoStringN(cBytes.start, C.int(cBytes.size)) - } - return -} - -func pnBytes(b []byte) C.pn_bytes_t { - if len(b) == 0 { - return C.pn_bytes_t{0, nil} - } else { - return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))} - } -} - -func cPtr(b []byte) *C.char { - if len(b) == 0 { - return nil - } - return (*C.char)(unsafe.Pointer(&b[0])) -} - -func cLen(b []byte) C.size_t { - return C.size_t(len(b)) -}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/uid.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/uid.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/uid.go deleted file mode 100644 index 944bf6f..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/uid.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// Generating unique IDs for various things. - -package amqp - -import ( - "strconv" - "sync/atomic" -) - -// A simple atomic counter to generate unique 64 bit IDs. -type UidCounter struct{ count uint64 } - -// NextInt gets the next uint64 value from the atomic counter. -func (uc *UidCounter) NextInt() uint64 { - return atomic.AddUint64(&uc.count, 1) -} - -// Next gets the next integer value encoded as a base32 string, safe for NUL terminated C strings. -func (uc *UidCounter) Next() string { - return strconv.FormatUint(uc.NextInt(), 32) -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/unmarshal.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/unmarshal.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/unmarshal.go deleted file mode 100644 index 89ab64a..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/unmarshal.go +++ /dev/null @@ -1,552 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -oor more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include <proton/codec.h> -import "C" - -import ( - "bytes" - "fmt" - "io" - "qpid.apache.org/proton/go/internal" - "reflect" - "unsafe" -) - -const minDecode = 1024 - -// Error returned if AMQP data cannot be unmarshaled as the desired Go type. -type BadUnmarshal struct { - // The name of the AMQP type. - AMQPType string - // The Go type. - GoType reflect.Type -} - -func newBadUnmarshal(pnType C.pn_type_t, v interface{}) *BadUnmarshal { - return &BadUnmarshal{pnTypeString(pnType), reflect.TypeOf(v)} -} - -func (e BadUnmarshal) Error() string { - if e.GoType.Kind() != reflect.Ptr { - return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType) - } else { - return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) - } -} - -// -// Decoding from a pn_data_t -// -// NOTE: we use panic() to signal a decoding error, simplifies decoding logic. -// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode. -// - -// Decoder decodes AMQP values from an io.Reader. -// -type Decoder struct { - reader io.Reader - buffer bytes.Buffer -} - -// NewDecoder returns a new decoder that reads from r. -// -// The decoder has it's own buffer and may read more data than required for the -// AMQP values requested. Use Buffered to see if there is data left in the -// buffer. -// -func NewDecoder(r io.Reader) *Decoder { - return &Decoder{r, bytes.Buffer{}} -} - -// Buffered returns a reader of the data remaining in the Decoder's buffer. The -// reader is valid until the next call to Decode. -// -func (d *Decoder) Buffered() io.Reader { - return bytes.NewReader(d.buffer.Bytes()) -} - -// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v. -// -// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value. -// -func (d *Decoder) Decode(v interface{}) (err error) { - defer internal.DoRecover(&err) - data := C.pn_data(0) - defer C.pn_data_free(data) - var n int - for n == 0 && err == nil { - n = unmarshal(data, d.buffer.Bytes(), v) - if n == 0 { // n == 0 means not enough data, read more - err = d.more() - if err != nil { - return - } - } - } - d.buffer.Next(n) - return -} - -/* -Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v. -Types are converted as follows: - - +---------------------------+----------------------------------------------------------------------+ - |To Go types |From AMQP types | - +===========================+======================================================================+ - |bool |bool | - +---------------------------+----------------------------------------------------------------------+ - |int, int8, int16, |Equivalent or smaller signed integer type: char, byte, short, int, | - |int32, int64 |long. | - +---------------------------+----------------------------------------------------------------------+ - |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: char, ubyte, ushort, | - |uint32, uint64 types |uint, ulong | - +---------------------------+----------------------------------------------------------------------+ - |float32, float64 |Equivalent or smaller float or double. | - +---------------------------+----------------------------------------------------------------------+ - |string, []byte |string, symbol or binary. | - +---------------------------+----------------------------------------------------------------------+ - |Symbol |symbol | - +---------------------------+----------------------------------------------------------------------+ - |map[K]T |map, provided all keys and values can unmarshal to types K, T | - +---------------------------+----------------------------------------------------------------------+ - |Map |map, any AMQP map | - +---------------------------+----------------------------------------------------------------------+ - |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: | - | +------------------------+---------------------------------------------+ - | |AMQP Type |Go Type in interface{} | - | +========================+=============================================+ - | |bool |bool | - | +------------------------+---------------------------------------------+ - | |char |unint8 | - | +------------------------+---------------------------------------------+ - | |byte,short,int,long |int8,int16,int32,int64 | - | +------------------------+---------------------------------------------+ - | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 | - | +------------------------+---------------------------------------------+ - | |float, double |float32, float64 | - | +------------------------+---------------------------------------------+ - | |string |string | - | +------------------------+---------------------------------------------+ - | |symbol |Symbol | - | +------------------------+---------------------------------------------+ - | |binary |Binary | - | +------------------------+---------------------------------------------+ - | |nulll |nil | - | +------------------------+---------------------------------------------+ - | |map |Map | - | +------------------------+---------------------------------------------+ - | |list |List | - +---------------------------+------------------------+---------------------------------------------+ - -The following Go types cannot be unmarshaled: complex64/128, uintptr, function, interface, channel. - -TODO types - -AMQP: timestamp, decimal32/64/128, uuid, described, array. - -Go: array, struct. - -Maps: currently we cannot unmarshal AMQP maps with unhashable key types, need an alternate -representation for those. -*/ -func Unmarshal(bytes []byte, v interface{}) (n int, err error) { - defer internal.DoRecover(&err) - data := C.pn_data(0) - defer C.pn_data_free(data) - n = unmarshal(data, bytes, v) - if n == 0 { - err = internal.Errorf("not enough data") - } - return -} - -// more reads more data when we can't parse a complete AMQP type -func (d *Decoder) more() error { - var readSize int64 = minDecode - if int64(d.buffer.Len()) > readSize { // Grow by doubling - readSize = int64(d.buffer.Len()) - } - var n int64 - n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize)) - if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0 - err = io.EOF - } - return err -} - -// unmarshal decodes from bytes and converts into the value pointed to by v. -// Used by Unmarshal and Decode -// -// Returns the number of bytes decoded or 0 if not enough data. -// -func unmarshal(data *C.pn_data_t, bytes []byte, v interface{}) (n int) { - n = decode(data, bytes) - if n == 0 { - return 0 - } - get(data, v) - return -} - -// get value from data into value pointed at by v. -func get(data *C.pn_data_t, v interface{}) { - pnType := C.pn_data_type(data) - - switch v := v.(type) { - case *bool: - switch pnType { - case C.PN_BOOL: - *v = bool(C.pn_data_get_bool(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *int8: - switch pnType { - case C.PN_CHAR: - *v = int8(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int8(C.pn_data_get_byte(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *uint8: - switch pnType { - case C.PN_CHAR: - *v = uint8(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint8(C.pn_data_get_ubyte(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *int16: - switch pnType { - case C.PN_CHAR: - *v = int16(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int16(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int16(C.pn_data_get_short(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *uint16: - switch pnType { - case C.PN_CHAR: - *v = uint16(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint16(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint16(C.pn_data_get_ushort(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *int32: - switch pnType { - case C.PN_CHAR: - *v = int32(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int32(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int32(C.pn_data_get_short(data)) - case C.PN_INT: - *v = int32(C.pn_data_get_int(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *uint32: - switch pnType { - case C.PN_CHAR: - *v = uint32(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint32(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint32(C.pn_data_get_ushort(data)) - case C.PN_UINT: - *v = uint32(C.pn_data_get_uint(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *int64: - switch pnType { - case C.PN_CHAR: - *v = int64(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int64(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int64(C.pn_data_get_short(data)) - case C.PN_INT: - *v = int64(C.pn_data_get_int(data)) - case C.PN_LONG: - *v = int64(C.pn_data_get_long(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *uint64: - switch pnType { - case C.PN_CHAR: - *v = uint64(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint64(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint64(C.pn_data_get_ushort(data)) - case C.PN_ULONG: - *v = uint64(C.pn_data_get_ulong(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *int: - switch pnType { - case C.PN_CHAR: - *v = int(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int(C.pn_data_get_short(data)) - case C.PN_INT: - *v = int(C.pn_data_get_int(data)) - case C.PN_LONG: - if unsafe.Sizeof(0) == 8 { - *v = int(C.pn_data_get_long(data)) - } else { - panic(newBadUnmarshal(pnType, v)) - } - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *uint: - switch pnType { - case C.PN_CHAR: - *v = uint(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint(C.pn_data_get_ushort(data)) - case C.PN_UINT: - *v = uint(C.pn_data_get_uint(data)) - case C.PN_ULONG: - if unsafe.Sizeof(0) == 8 { - *v = uint(C.pn_data_get_ulong(data)) - } else { - panic(newBadUnmarshal(pnType, v)) - } - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *float32: - switch pnType { - case C.PN_FLOAT: - *v = float32(C.pn_data_get_float(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *float64: - switch pnType { - case C.PN_FLOAT: - *v = float64(C.pn_data_get_float(data)) - case C.PN_DOUBLE: - *v = float64(C.pn_data_get_double(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *string: - switch pnType { - case C.PN_STRING: - *v = goString(C.pn_data_get_string(data)) - case C.PN_SYMBOL: - *v = goString(C.pn_data_get_symbol(data)) - case C.PN_BINARY: - *v = goString(C.pn_data_get_binary(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *[]byte: - switch pnType { - case C.PN_STRING: - *v = goBytes(C.pn_data_get_string(data)) - case C.PN_SYMBOL: - *v = goBytes(C.pn_data_get_symbol(data)) - case C.PN_BINARY: - *v = goBytes(C.pn_data_get_binary(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *Binary: - switch pnType { - case C.PN_BINARY: - *v = Binary(goBytes(C.pn_data_get_binary(data))) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *Symbol: - switch pnType { - case C.PN_SYMBOL: - *v = Symbol(goBytes(C.pn_data_get_symbol(data))) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *interface{}: - getInterface(data, v) - - default: - if reflect.TypeOf(v).Kind() != reflect.Ptr { - panic(newBadUnmarshal(pnType, v)) - } - switch reflect.TypeOf(v).Elem().Kind() { - case reflect.Map: - getMap(data, v) - case reflect.Slice: - getList(data, v) - default: - panic(newBadUnmarshal(pnType, v)) - } - } - err := dataError("unmarshaling", data) - if err != nil { - panic(err) - } - return -} - -// Getting into an interface is driven completely by the AMQP type, since the interface{} -// target is type-neutral. -func getInterface(data *C.pn_data_t, v *interface{}) { - pnType := C.pn_data_type(data) - switch pnType { - case C.PN_NULL: - *v = nil - case C.PN_BOOL: - *v = bool(C.pn_data_get_bool(data)) - case C.PN_UBYTE: - *v = uint8(C.pn_data_get_ubyte(data)) - case C.PN_BYTE: - *v = int8(C.pn_data_get_byte(data)) - case C.PN_USHORT: - *v = uint16(C.pn_data_get_ushort(data)) - case C.PN_SHORT: - *v = int16(C.pn_data_get_short(data)) - case C.PN_UINT: - *v = uint32(C.pn_data_get_uint(data)) - case C.PN_INT: - *v = int32(C.pn_data_get_int(data)) - case C.PN_CHAR: - *v = uint8(C.pn_data_get_char(data)) - case C.PN_ULONG: - *v = uint64(C.pn_data_get_ulong(data)) - case C.PN_LONG: - *v = int64(C.pn_data_get_long(data)) - case C.PN_FLOAT: - *v = float32(C.pn_data_get_float(data)) - case C.PN_DOUBLE: - *v = float64(C.pn_data_get_double(data)) - case C.PN_BINARY: - *v = Binary(goBytes(C.pn_data_get_binary(data))) - case C.PN_STRING: - *v = goString(C.pn_data_get_string(data)) - case C.PN_SYMBOL: - *v = Symbol(goString(C.pn_data_get_symbol(data))) - case C.PN_MAP: - m := make(Map) - get(data, &m) - *v = m // FIXME aconway 2015-03-13: avoid the copy? - case C.PN_LIST: - l := make(List, 0) - get(data, &l) - *v = l // FIXME aconway 2015-03-13: avoid the copy? - default: - panic(newBadUnmarshal(pnType, v)) - } -} - -// get into map pointed at by v -func getMap(data *C.pn_data_t, v interface{}) { - pnType := C.pn_data_type(data) - if pnType != C.PN_MAP { - panic(newBadUnmarshal(pnType, v)) - } - mapValue := reflect.ValueOf(v).Elem() - mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map - count := int(C.pn_data_get_map(data)) - if bool(C.pn_data_enter(data)) { - for i := 0; i < count/2; i++ { - if bool(C.pn_data_next(data)) { - key := reflect.New(mapValue.Type().Key()) - get(data, key.Interface()) - if bool(C.pn_data_next(data)) { - val := reflect.New(mapValue.Type().Elem()) - get(data, val.Interface()) - mapValue.SetMapIndex(key.Elem(), val.Elem()) - } - } - } - C.pn_data_exit(data) - } -} - -func getList(data *C.pn_data_t, v interface{}) { - pnType := C.pn_data_type(data) - if pnType != C.PN_LIST { - panic(newBadUnmarshal(pnType, v)) - } - count := int(C.pn_data_get_list(data)) - listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count) - if bool(C.pn_data_enter(data)) { - for i := 0; i < count; i++ { - if bool(C.pn_data_next(data)) { - val := reflect.New(listValue.Type().Elem()) - get(data, val.Interface()) - listValue.Index(i).Set(val.Elem()) - } - } - C.pn_data_exit(data) - } - // FIXME aconway 2015-04-09: avoid the copy? - reflect.ValueOf(v).Elem().Set(listValue) -} - -// decode from bytes. -// Return bytes decoded or 0 if we could not decode a complete object. -// -func decode(data *C.pn_data_t, bytes []byte) int { - if len(bytes) == 0 { - return 0 - } - n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes))) - if n == int(C.PN_UNDERFLOW) { - C.pn_error_clear(C.pn_data_error(data)) - return 0 - } else if n <= 0 { - panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n))) - } - return n -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url.go deleted file mode 100644 index 58711c7..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url.go +++ /dev/null @@ -1,96 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -/* -#include <stdlib.h> -#include <string.h> -#include <proton/url.h> - -// Helper function for setting URL fields. -typedef void (*setter_fn)(pn_url_t* url, const char* value); -inline void set(pn_url_t *url, setter_fn s, const char* value) { - s(url, value); -} -*/ -import "C" - -import ( - "net" - "net/url" - "qpid.apache.org/proton/go/internal" - "unsafe" -) - -const ( - amqp string = "amqp" - amqps = "amqps" -) - -// ParseUrl parses an AMQP URL string and returns a net/url.Url. -// -// It is more forgiving than net/url.Parse and allows most of the parts of the -// URL to be missing, assuming AMQP defaults. -// -func ParseURL(s string) (u *url.URL, err error) { - cstr := C.CString(s) - defer C.free(unsafe.Pointer(cstr)) - pnUrl := C.pn_url_parse(cstr) - if pnUrl == nil { - return nil, internal.Errorf("bad URL %#v", s) - } - defer C.pn_url_free(pnUrl) - - scheme := C.GoString(C.pn_url_get_scheme(pnUrl)) - username := C.GoString(C.pn_url_get_username(pnUrl)) - password := C.GoString(C.pn_url_get_password(pnUrl)) - host := C.GoString(C.pn_url_get_host(pnUrl)) - port := C.GoString(C.pn_url_get_port(pnUrl)) - path := C.GoString(C.pn_url_get_path(pnUrl)) - - if err != nil { - return nil, internal.Errorf("bad URL %#v: %s", s, err) - } - if scheme == "" { - scheme = amqp - } - if port == "" { - if scheme == amqps { - port = amqps - } else { - port = amqp - } - } - var user *url.Userinfo - if password != "" { - user = url.UserPassword(username, password) - } else if username != "" { - user = url.User(username) - } - - u = &url.URL{ - Scheme: scheme, - User: user, - Host: net.JoinHostPort(host, port), - Path: path, - } - - return u, nil -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url_test.go deleted file mode 100644 index f80f1c4..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url_test.go +++ /dev/null @@ -1,51 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -import ( - "fmt" -) - -func ExampleParseURL() { - for _, s := range []string{ - "amqp://username:password@host:1234/path", - "host:1234", - "host", - ":1234", - "host/path", - "amqps://host", - "", - } { - u, err := ParseURL(s) - if err != nil { - fmt.Println(err) - } else { - fmt.Println(u) - } - } - // Output: - // amqp://username:password@host:1234/path - // amqp://host:1234 - // amqp://host:amqp - // amqp://:1234 - // amqp://host:amqp/path - // amqps://host:amqps - // proton: bad URL "" -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/doc.go deleted file mode 100644 index 7a9ec12..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/doc.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/* -Package event provides a low-level API to the proton AMQP engine. - -For most tasks, consider instead package qpid.apache.org/proton/go/messaging. -It provides a higher-level, concurrent API that is easier to use. - -The API is event based. There are two alternative styles of handler. EventHandler -provides the core proton events. MessagingHandler provides a slighly simplified -view of the event stream and automates some common tasks. - -See type Pump documentation for more details of the interaction between proton -events and goroutines. -*/ -package event - -// #cgo LDFLAGS: -lqpid-proton -import "C" - -// This file is just for the package comment. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/handlers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/handlers.go deleted file mode 100644 index d76fac9..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/handlers.go +++ /dev/null @@ -1,411 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package event - -// #include <proton/reactor.h> -// #include <proton/handlers.h> -import "C" - -import ( - "qpid.apache.org/proton/go/internal" -) - -// EventHandler handles core proton events. -type EventHandler interface { - // HandleEvent is called with an event. - // Typically HandleEvent() is implemented as a switch on e.Type() - HandleEvent(e Event) error -} - -// cHandler wraps a C pn_handler_t -type cHandler struct { - pn *C.pn_handler_t -} - -func (h cHandler) HandleEvent(e Event) error { - C.pn_handler_dispatch(h.pn, e.pn, C.pn_event_type(e.pn)) - return nil // FIXME aconway 2015-03-31: error handling -} - -// MessagingHandler provides an alternative interface to EventHandler. -// it is easier to use for most applications that send and receive messages. -// -// Implement this interface and then wrap your value with a MessagingHandlerDelegator. -// MessagingHandlerDelegator implements EventHandler and can be registered with a Pump. -// -type MessagingHandler interface { - HandleMessagingEvent(MessagingEventType, Event) error -} - -// MessagingEventType provides a set of events that are easier to work with than the -// core events defined by EventType -// -// There are 3 types of "endpoint": Connection, Session and Link. -// For each endpoint there are 5 event types: Opening, Opened, Closing, Closed and Error. -// The meaning of these events is as follows: -// -// Opening: The remote end opened, the local end will open automatically. -// -// Opened: Both ends are open, regardless of which end opened first. -// -// Closing: The remote end closed without error, the local end will close automatically. -// -// Error: The remote end closed with an error, the local end will close automatically. -// -// Closed: Both ends are closed, regardless of which end closed first or if there was an error. -// -type MessagingEventType int - -const ( - // The event loop starts. - MStart MessagingEventType = iota - // The peer closes the connection with an error condition. - MConnectionError - // The peer closes the session with an error condition. - MSessionError - // The peer closes the link with an error condition. - MLinkError - // The peer Initiates the opening of the connection. - MConnectionOpening - // The peer initiates the opening of the session. - MSessionOpening - // The peer initiates the opening of the link. - MLinkOpening - // The connection is opened. - MConnectionOpened - // The session is opened. - MSessionOpened - // The link is opened. - MLinkOpened - // The peer initiates the closing of the connection. - MConnectionClosing - // The peer initiates the closing of the session. - MSessionClosing - // The peer initiates the closing of the link. - MLinkClosing - // Both ends of the connection are closed. - MConnectionClosed - // Both ends of the session are closed. - MSessionClosed - // Both ends of the link are closed. - MLinkClosed - // The connection is disconnected. - MConnectionDisconnected - // The session's connection was disconnected - MSessionDisconnected - // The session's connection was disconnected - MLinkDisconnected - // The sender link has credit and messages can - // therefore be transferred. - MSendable - // The remote peer accepts an outgoing message. - MAccepted - // The remote peer rejects an outgoing message. - MRejected - // The peer releases an outgoing message. Note that this may be in response to - // either the RELEASE or MODIFIED state as defined by the AMQP specification. - MReleased - // The peer has settled the outgoing message. This is the point at which it - // shouod never be retransmitted. - MSettled - // A message is received. Call DecodeMessage() to decode as an amqp.Message. - // To manage the outcome of this messages (e.g. to accept or reject the message) - // use Event.Delivery(). - MMessage - // The event loop terminates, there are no more events to process. - MFinal -) - -func (t MessagingEventType) String() string { - switch t { - case MStart: - return "Start" - case MConnectionError: - return "ConnectionError" - case MSessionError: - return "SessionError" - case MLinkError: - return "LinkError" - case MConnectionOpening: - return "ConnectionOpening" - case MSessionOpening: - return "SessionOpening" - case MLinkOpening: - return "LinkOpening" - case MConnectionOpened: - return "ConnectionOpened" - case MSessionOpened: - return "SessionOpened" - case MLinkOpened: - return "LinkOpened" - case MConnectionClosing: - return "ConnectionClosing" - case MSessionClosing: - return "SessionClosing" - case MLinkClosing: - return "LinkClosing" - case MConnectionClosed: - return "ConnectionClosed" - case MSessionClosed: - return "SessionClosed" - case MLinkClosed: - return "LinkClosed" - case MConnectionDisconnected: - return "ConnectionDisconnected" - case MSessionDisconnected: - return "MSessionDisconnected" - case MLinkDisconnected: - return "MLinkDisconnected" - case MSendable: - return "Sendable" - case MAccepted: - return "Accepted" - case MRejected: - return "Rejected" - case MReleased: - return "Released" - case MSettled: - return "Settled" - case MMessage: - return "Message" - default: - return "Unknown" - } -} - -// ResourceHandler provides a simple way to track the creation and deletion of -// various proton objects. -// endpointDelegator captures common patterns for endpoints opening/closing -type endpointDelegator struct { - remoteOpen, remoteClose, localOpen, localClose EventType - opening, opened, closing, closed, error MessagingEventType - endpoint func(Event) Endpoint - delegate MessagingHandler -} - -// HandleEvent handles an open/close event for an endpoint in a generic way. -func (d endpointDelegator) HandleEvent(e Event) (err error) { - endpoint := d.endpoint(e) - state := endpoint.State() - - switch e.Type() { - - case d.localOpen: - if state.Is(SRemoteActive) { - err = d.delegate.HandleMessagingEvent(d.opened, e) - } - - case d.remoteOpen: - switch { - case state.Is(SLocalActive): - err = d.delegate.HandleMessagingEvent(d.opened, e) - case state.Is(SLocalUninit): - err = d.delegate.HandleMessagingEvent(d.opening, e) - if err == nil { - endpoint.Open() - } - } - - case d.remoteClose: - var err1 error - if endpoint.RemoteCondition().IsSet() { // Closed with error - err1 = d.delegate.HandleMessagingEvent(d.error, e) - if err1 == nil { // Don't overwrite an application error. - err1 = endpoint.RemoteCondition().Error() - } - } else { - err1 = d.delegate.HandleMessagingEvent(d.closing, e) - } - if state.Is(SLocalClosed) { - err = d.delegate.HandleMessagingEvent(d.closed, e) - } else if state.Is(SLocalActive) { - endpoint.Close() - } - if err1 != nil { // Keep the first error. - err = err1 - } - - case d.localClose: - if state.Is(SRemoteClosed) { - err = d.delegate.HandleMessagingEvent(d.closed, e) - } - - default: - // We shouldn't be called with any other event type. - panic(internal.Errorf("internal error, not an open/close event: %s", e)) - } - - return err -} - -// MessagingDelegator implments a EventHandler and delegates to a MessagingHandler. -// You can modify the exported fields before you pass the MessagingDelegator to -// a Pump. -type MessagingDelegator struct { - delegate MessagingHandler - connection, session, link endpointDelegator - handshaker, flowcontroller EventHandler - - // AutoSettle (default true) automatically pre-settle outgoing messages. - AutoSettle bool - // AutoAccept (default true) automatically accept and settle incoming messages - // if they are not settled by the delegate. - AutoAccept bool - // Prefetch (default 10) initial credit to issue for incoming links. - Prefetch int - // PeerCloseIsError (default false) if true a close by the peer will be treated as an error. - PeerCloseError bool -} - -func NewMessagingDelegator(h MessagingHandler) EventHandler { - return &MessagingDelegator{ - delegate: h, - connection: endpointDelegator{ - EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose, - MConnectionOpening, MConnectionOpened, MConnectionClosing, MConnectionClosed, - MConnectionError, - func(e Event) Endpoint { return e.Connection() }, - h, - }, - session: endpointDelegator{ - ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose, - MSessionOpening, MSessionOpened, MSessionClosing, MSessionClosed, - MSessionError, - func(e Event) Endpoint { return e.Session() }, - h, - }, - link: endpointDelegator{ - ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose, - MLinkOpening, MLinkOpened, MLinkClosing, MLinkClosed, - MLinkError, - func(e Event) Endpoint { return e.Link() }, - h, - }, - flowcontroller: nil, - AutoSettle: true, - AutoAccept: true, - Prefetch: 10, - PeerCloseError: false, - } -} - -func handleIf(h EventHandler, e Event) error { - if h != nil { - return h.HandleEvent(e) - } - return nil -} - -// Handle a proton event by passing the corresponding MessagingEvent(s) to -// the MessagingHandler. -func (d *MessagingDelegator) HandleEvent(e Event) error { - handleIf(d.flowcontroller, e) // FIXME aconway 2015-03-31: error handling. - - switch e.Type() { - - case EConnectionInit: - d.flowcontroller = cHandler{C.pn_flowcontroller(C.int(d.Prefetch))} - d.delegate.HandleMessagingEvent(MStart, e) - - case EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose: - return d.connection.HandleEvent(e) - - case ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose: - return d.session.HandleEvent(e) - - case ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose: - return d.link.HandleEvent(e) - - case ELinkFlow: - if e.Link().IsSender() && e.Link().Credit() > 0 { - return d.delegate.HandleMessagingEvent(MSendable, e) - } - - case EDelivery: - if e.Delivery().Link().IsReceiver() { - d.incoming(e) - } else { - d.outgoing(e) - } - - case ETransportTailClosed: - c := e.Connection() - for l := c.LinkHead(SRemoteActive); !l.IsNil(); l = l.Next(SRemoteActive) { - e2 := e - e2.link = l - e2.session = l.Session() - d.delegate.HandleMessagingEvent(MLinkDisconnected, e2) - } - for s := c.SessionHead(SRemoteActive); !s.IsNil(); s = s.Next(SRemoteActive) { - e2 := e - e2.session = s - d.delegate.HandleMessagingEvent(MSessionDisconnected, e2) - } - d.delegate.HandleMessagingEvent(MConnectionDisconnected, e) - d.delegate.HandleMessagingEvent(MFinal, e) - } - return nil -} - -func (d *MessagingDelegator) incoming(e Event) (err error) { - delivery := e.Delivery() - if delivery.Readable() && !delivery.Partial() { - if e.Link().State().Is(SLocalClosed) { - e.Link().Advance() - if d.AutoAccept { - delivery.Release(false) - } - } else { - err = d.delegate.HandleMessagingEvent(MMessage, e) - e.Link().Advance() - if d.AutoAccept && !delivery.Settled() { - if err == nil { - delivery.Accept() - } else { - delivery.Reject() - } - } - } - } else if delivery.Updated() && delivery.Settled() { - err = d.delegate.HandleMessagingEvent(MSettled, e) - } - return -} - -func (d *MessagingDelegator) outgoing(e Event) (err error) { - delivery := e.Delivery() - if delivery.Updated() { - switch delivery.Remote().Type() { - case Accepted: - err = d.delegate.HandleMessagingEvent(MAccepted, e) - case Rejected: - err = d.delegate.HandleMessagingEvent(MRejected, e) - case Released, Modified: - err = d.delegate.HandleMessagingEvent(MReleased, e) - } - if err == nil && delivery.Settled() { - err = d.delegate.HandleMessagingEvent(MSettled, e) - } - if err == nil && d.AutoSettle { - delivery.Settle() - } - } - return -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/message.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/message.go deleted file mode 100644 index d900744..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/message.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package event - -// #include <proton/types.h> -// #include <proton/message.h> -// #include <proton/codec.h> -import "C" - -import ( - "qpid.apache.org/proton/go/amqp" - "qpid.apache.org/proton/go/internal" -) - -// DecodeMessage decodes the message containined in a delivery event. -func DecodeMessage(e Event) (m amqp.Message, err error) { - defer internal.DoRecover(&err) - delivery := e.Delivery() - if !delivery.Readable() || delivery.Partial() { - return nil, internal.Errorf("attempting to get incomplete message") - } - data := make([]byte, delivery.Pending()) - result := delivery.Link().Recv(data) - if result != len(data) { - return nil, internal.Errorf("cannot receive message: %s", internal.PnErrorCode(result)) - } - return amqp.DecodeMessage(data) -} - -// FIXME aconway 2015-04-08: proper handling of delivery tags. Tag counter per link. -var tags amqp.UidCounter - -// Send sends a amqp.Message over a Link. -// Returns a Delivery that can be use to determine the outcome of the message. -func (link Link) Send(m amqp.Message) (Delivery, error) { - if !link.IsSender() { - return Delivery{}, internal.Errorf("attempt to send message on receiving link") - } - // FIXME aconway 2015-04-08: buffering, error handling - delivery := link.Delivery(tags.Next()) - bytes, err := m.Encode(nil) - if err != nil { - return Delivery{}, internal.Errorf("cannot send mesage %s", err) - } - result := link.SendBytes(bytes) - link.Advance() - if result != len(bytes) { - if result < 0 { - return delivery, internal.Errorf("send failed %v", internal.PnErrorCode(result)) - } else { - return delivery, internal.Errorf("send incomplete %v of %v", result, len(bytes)) - } - } - if link.RemoteSndSettleMode() == PnSndSettled { // FIXME aconway 2015-04-08: enum names - delivery.Settle() - } - return delivery, nil -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go deleted file mode 100644 index 73db513..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go +++ /dev/null @@ -1,360 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package event - -// #include <proton/connection.h> -// #include <proton/transport.h> -// #include <proton/event.h> -// #include <proton/reactor.h> -// #include <proton/handlers.h> -// #include <proton/transport.h> -// #include <proton/session.h> -// #include <memory.h> -// #include <stdlib.h> -// -// PN_HANDLE(REMOTE_ADDR) -import "C" - -import ( - "fmt" - "io" - "net" - "qpid.apache.org/proton/go/internal" - "sync" - "unsafe" -) - -// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel. -type bufferChan struct { - buffers chan []byte - buf1, buf2 []byte -} - -func newBufferChan(size int) *bufferChan { - return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)} -} - -func (b *bufferChan) buffer() []byte { - b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers. - return b.buf1[:cap(b.buf1)] -} - -// FIXME aconway 2015-05-04: direct sending to Inject may block user goroutines if -// the pum stops. Make this a function that selects on running. - -// FIXME aconway 2015-05-05: for consistency should Pump be called Driver? - -/* -Pump reads from a net.Conn, decodes AMQP events and calls the appropriate -Handler functions. Actions taken by Handler functions (such as sending messages) -are encoded and written to the net.Conn. - -The proton protocol engine is single threaded (per connection). The Pump runs -proton in the goroutine that calls Pump.Run() and creates goroutines to feed -data to/from a net.Conn. You can create multiple Pumps to handle multiple -connections concurrently. - -Methods in this package can only be called in the goroutine that executes the -corresponding Pump.Run(). You implement the EventHandler or MessagingHandler -interfaces and provide those values to NewPump(). Their HandleEvent method will be -called in the Pump goroutine, in typical event-driven style. - -Handlers can pass values from an event (Connections, Links, Deliveries etc.) to -other goroutines, store them, or use them as map indexes. Effectively they are -just C pointers. Other goroutines cannot call their methods directly but they -can can create function closures that call their methods and send those closures -to the Pump.Inject channel. They will execute safely in the pump -goroutine. Injected functions, or your handlers, can set up channels to get -results back to other goroutines. - -You are responsible for ensuring you don't use an event value after the C object -has been deleted. The handler methods will tell you when a value is no longer -valid. For example after a MethodHandler handles a LinkClosed event, that link -is no longer valid. If you do Link.Close() yourself (in a handler or injected -function) the link remains valid until the corresponing LinkClosed event is -received by the handler. - -Pump.Close() will take care of cleaning up any remaining values and types when -you are done with the Pump. All values associated with a pump become invalid -when you call Pump.Close() - -The qpid.apache.org/proton/go/messaging package will do all this for you, so unless -you are doing something fairly low-level it is probably a better choice. - -*/ -type Pump struct { - // Error is set on exit from Run() if there was an error. - Error error // FIXME aconway 2015-05-26: make it a function - // Channel to inject functions to be executed in the Pump's proton event loop. - Inject chan func() - - conn net.Conn - transport *C.pn_transport_t - connection *C.pn_connection_t - collector *C.pn_collector_t - read *bufferChan // Read buffers channel. - write *bufferChan // Write buffers channel. - handlers []EventHandler // Handlers for proton events. - running chan struct{} // This channel will be closed when the goroutines are done. -} - -const bufferSize = 4096 - -var pumps map[*C.pn_connection_t]*Pump - -func init() { - pumps = make(map[*C.pn_connection_t]*Pump) -} - -// NewPump initializes a pump with a connection and handlers. To start it running: -// p := NewPump(...) -// go run p.Run() -// The goroutine will exit when the pump is closed or disconnected. -// You can check for errors on Pump.Error. -// -func NewPump(conn net.Conn, handlers ...EventHandler) (*Pump, error) { - // Save the connection ID for Connection.String() - p := &Pump{ - Inject: make(chan func(), 100), // FIXME aconway 2015-05-04: blocking hack - conn: conn, - transport: C.pn_transport(), - connection: C.pn_connection(), - collector: C.pn_collector(), - handlers: handlers, - read: newBufferChan(bufferSize), - write: newBufferChan(bufferSize), - running: make(chan struct{}), - } - if p.transport == nil || p.connection == nil || p.collector == nil { - return nil, internal.Errorf("failed to allocate pump") - } - pnErr := int(C.pn_transport_bind(p.transport, p.connection)) - if pnErr != 0 { - return nil, internal.Errorf("cannot setup pump: %s", internal.PnErrorCode(pnErr)) - } - C.pn_connection_collect(p.connection, p.collector) - C.pn_connection_open(p.connection) - pumps[p.connection] = p - return p, nil -} - -func (p *Pump) String() string { - return fmt.Sprintf("(%s-%s)", p.conn.LocalAddr(), p.conn.RemoteAddr()) -} - -func (p *Pump) Id() string { - return fmt.Sprintf("%p", &p) -} - -// setError sets error only if not already set -func (p *Pump) setError(e error) { - if p.Error == nil { - p.Error = e - } -} - -// Server puts the Pump in server mode, meaning it will auto-detect security settings on -// the incoming connnection such as use of SASL and SSL. -// Must be called before Run() -// -func (p *Pump) Server() { - C.pn_transport_set_server(p.transport) -} - -func (p *Pump) free() { - if p.connection != nil { - C.pn_connection_free(p.connection) - } - if p.transport != nil { - C.pn_transport_free(p.transport) - } - if p.collector != nil { - C.pn_collector_free(p.collector) - } - for _, h := range p.handlers { - switch h := h.(type) { - case cHandler: - C.pn_handler_free(h.pn) - } - } -} - -// Close closes the AMQP connection, the net.Conn, and stops associated goroutines. -// It will cause Run() to return. Run() may return earlier if the network disconnects -// but you must still call Close() to clean everything up. -// -// Methods on values associated with the pump (Connections, Sessions, Links) will panic -// if called after Close() -// -func (p *Pump) Close() error { - // If the pump is still running, inject a close. Either way wait for it to finish. - select { - case p.Inject <- func() { C.pn_connection_close(p.connection) }: - <-p.running // Wait to finish - case <-p.running: // Wait for goroutines to finish - } - delete(pumps, p.connection) - p.free() - if p.Error == io.EOF { - return nil - } - return p.Error -} - -// Run the pump. Normally called in a goroutine as: go pump.Run() -// An error dunring Run is stored on p.Error. -// -func (p *Pump) Run() { - // Signal errors from the read/write goroutines. Don't block if we don't - // read all the errors, we only care about the first. - error := make(chan error, 2) - // FIXME aconway 2015-05-04: stop := make(chan struct{}) // Closed to signal that read/write should stop. - - wait := sync.WaitGroup{} - wait.Add(2) - - go func() { // Read goroutine - defer wait.Done() - for { - rbuf := p.read.buffer() - n, err := p.conn.Read(rbuf) - if n > 0 { - p.read.buffers <- rbuf[:n] - } else if err != nil { - close(p.read.buffers) - error <- err - return - } - } - }() - - go func() { // Write goroutine - defer wait.Done() - for { - wbuf, ok := <-p.write.buffers - if !ok { - return - } - _, err := p.conn.Write(wbuf) - if err != nil { - error <- err - return - } - } - }() - - wbuf := p.write.buffer()[:0] -loop: - for { - if len(wbuf) == 0 { - p.pop(&wbuf) - } - // Don't set wchan unless there is something to write. - var wchan chan []byte - if len(wbuf) > 0 { - wchan = p.write.buffers - } - - select { - case buf := <-p.read.buffers: // Read a buffer - p.push(buf) - case wchan <- wbuf: // Write a buffer - wbuf = p.write.buffer()[:0] - case f := <-p.Inject: // Function injected from another goroutine - f() - case err := <-error: // Read or write error - p.setError(err) - C.pn_transport_close_tail(p.transport) - C.pn_transport_close_head(p.transport) - } - if err := p.process(); err != nil { - p.setError(err) - break loop - } - } - close(p.write.buffers) - p.conn.Close() - wait.Wait() - close(p.running) // Signal goroutines have exited and Error is set. -} - -func minInt(a, b int) int { - if a < b { - return a - } else { - return b - } -} - -func (p *Pump) pop(buf *[]byte) { - pending := int(C.pn_transport_pending(p.transport)) - switch { - case pending == int(C.PN_EOS): - *buf = (*buf)[:] - return - case pending < 0: - panic(internal.Errorf("%s", internal.PnErrorCode(pending))) - } - size := minInt(pending, cap(*buf)) - *buf = (*buf)[:size] - if size == 0 { - return - } - C.memcpy(unsafe.Pointer(&(*buf)[0]), unsafe.Pointer(C.pn_transport_head(p.transport)), C.size_t(size)) - C.pn_transport_pop(p.transport, C.size_t(size)) -} - -func (p *Pump) push(buf []byte) { - buf2 := buf - for len(buf2) > 0 { - n := int(C.pn_transport_push(p.transport, (*C.char)(unsafe.Pointer((&buf2[0]))), C.size_t(len(buf2)))) - if n <= 0 { - panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n))) - } - buf2 = buf2[n:] - } -} - -func (p *Pump) handle(e Event) error { - for _, h := range p.handlers { - if err := h.HandleEvent(e); err != nil { - return err - } - } - if e.Type() == ETransportClosed { - return io.EOF - } - return nil -} - -func (p *Pump) process() error { - // FIXME aconway 2015-05-04: if a Handler returns error we should stop the pump - for ce := C.pn_collector_peek(p.collector); ce != nil; ce = C.pn_collector_peek(p.collector) { - e := makeEvent(ce) - if err := p.handle(e); err != nil { - return err - } - C.pn_collector_pop(p.collector) - } - return nil -} - -// Connectoin gets the Pump's connection value. -func (p *Pump) Connection() Connection { return Connection{p.connection} } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/wrappers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/wrappers.go deleted file mode 100644 index 7043b9c..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/wrappers.go +++ /dev/null @@ -1,253 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package event - -//#include <proton/codec.h> -//#include <proton/connection.h> -//#include <proton/session.h> -//#include <proton/session.h> -//#include <proton/delivery.h> -//#include <proton/link.h> -//#include <proton/event.h> -//#include <proton/transport.h> -//#include <proton/link.h> -//#include <stdlib.h> -import "C" - -import ( - "fmt" - "qpid.apache.org/proton/go/internal" - "unsafe" -) - -// FIXME aconway 2015-05-05: Documentation for generated types. - -// Event is an AMQP protocol event. -type Event struct { - pn *C.pn_event_t - eventType EventType - connection Connection - session Session - link Link - delivery Delivery -} - -func makeEvent(pn *C.pn_event_t) Event { - return Event{ - pn: pn, - eventType: EventType(C.pn_event_type(pn)), - connection: Connection{C.pn_event_connection(pn)}, - session: Session{C.pn_event_session(pn)}, - link: Link{C.pn_event_link(pn)}, - delivery: Delivery{C.pn_event_delivery(pn)}, - } -} -func (e Event) IsNil() bool { return e.eventType == EventType(0) } -func (e Event) Type() EventType { return e.eventType } -func (e Event) Connection() Connection { return e.connection } -func (e Event) Session() Session { return e.session } -func (e Event) Link() Link { return e.link } -func (e Event) Delivery() Delivery { return e.delivery } -func (e Event) String() string { return e.Type().String() } - -// Data holds a pointer to decoded AMQP data. -// Use amqp.marshal/unmarshal to access it as Go data types. -// -type Data struct{ pn *C.pn_data_t } - -func NewData(p unsafe.Pointer) Data { return Data{(*C.pn_data_t)(p)} } - -func (d Data) Free() { C.pn_data_free(d.pn) } -func (d Data) Pointer() unsafe.Pointer { return unsafe.Pointer(d.pn) } -func (d Data) Clear() { C.pn_data_clear(d.pn) } -func (d Data) Rewind() { C.pn_data_rewind(d.pn) } -func (d Data) Error() error { - return internal.PnError(unsafe.Pointer(C.pn_data_error(d.pn))) -} - -// State holds the state flags for an AMQP endpoint. -type State byte - -const ( - SLocalUninit State = C.PN_LOCAL_UNINIT - SLocalActive = C.PN_LOCAL_ACTIVE - SLocalClosed = C.PN_LOCAL_CLOSED - SRemoteUninit = C.PN_REMOTE_UNINIT - SRemoteActive = C.PN_REMOTE_ACTIVE - SRemoteClosed = C.PN_REMOTE_CLOSED -) - -// Is is True if bits & state is non 0. -func (s State) Is(bits State) bool { return s&bits != 0 } - -// Return a State containig just the local flags -func (s State) Local() State { return State(s & C.PN_LOCAL_MASK) } - -// Return a State containig just the remote flags -func (s State) Remote() State { return State(s & C.PN_REMOTE_MASK) } - -// Endpoint is the common interface for Connection, Link and Session. -type Endpoint interface { - // State is the open/closed state. - State() State - // Open an endpoint. - Open() - // Close an endpoint. - Close() - // Condition holds a local error condition. - Condition() Condition - // RemoteCondition holds a remote error condition. - RemoteCondition() Condition -} - -const ( - Received uint64 = C.PN_RECEIVED - Accepted = C.PN_ACCEPTED - Rejected = C.PN_REJECTED - Released = C.PN_RELEASED - Modified = C.PN_MODIFIED -) - -// SettleAs is equivalent to d.Update(disposition); d.Settle() -// It is a no-op if e does not have a delivery. -func (d Delivery) SettleAs(disposition uint64) { - d.Update(disposition) - d.Settle() -} - -// Accept accepts and settles a delivery. -func (d Delivery) Accept() { d.SettleAs(Accepted) } - -// Reject rejects and settles a delivery -func (d Delivery) Reject() { d.SettleAs(Rejected) } - -// Release releases and settles a delivery -// If delivered is true the delivery count for the message will be increased. -func (d Delivery) Release(delivered bool) { - if delivered { - d.SettleAs(Modified) - } else { - d.SettleAs(Released) - } -} - -// FIXME aconway 2015-05-05: don't expose DeliveryTag as a C pointer, just as a String? - -type DeliveryTag struct{ pn C.pn_delivery_tag_t } - -func (t DeliveryTag) String() string { return C.GoStringN(t.pn.start, C.int(t.pn.size)) } - -func (l Link) Recv(buf []byte) int { - if len(buf) == 0 { - return 0 - } - return int(C.pn_link_recv(l.pn, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) -} - -func (l Link) SendBytes(bytes []byte) int { - return int(C.pn_link_send(l.pn, cPtr(bytes), cLen(bytes))) -} - -func pnTag(tag string) C.pn_delivery_tag_t { - bytes := []byte(tag) - return C.pn_dtag(cPtr(bytes), cLen(bytes)) -} - -func (l Link) Delivery(tag string) Delivery { - return Delivery{C.pn_delivery(l.pn, pnTag(tag))} -} - -func cPtr(b []byte) *C.char { - if len(b) == 0 { - return nil - } - return (*C.char)(unsafe.Pointer(&b[0])) -} - -func cLen(b []byte) C.size_t { - return C.size_t(len(b)) -} - -func (s Session) Sender(name string) Link { - cname := C.CString(name) - defer C.free(unsafe.Pointer(cname)) - return Link{C.pn_sender(s.pn, cname)} -} - -func (s Session) Receiver(name string) Link { - cname := C.CString(name) - defer C.free(unsafe.Pointer(cname)) - return Link{C.pn_receiver(s.pn, cname)} -} - -func joinId(a, b interface{}) string { - return fmt.Sprintf("%s/%s", a, b) -} - -// Pump associated with this connection. -func (c Connection) Pump() *Pump { return pumps[c.pn] } - -// Unique (per process) string identifier for a connection, useful for debugging. -func (c Connection) String() string { return pumps[c.pn].String() } - -// Head functions don't follow the normal naming conventions so missed by the generator. - -func (c Connection) LinkHead(s State) Link { - return Link{C.pn_link_head(c.pn, C.pn_state_t(s))} -} - -func (c Connection) SessionHead(s State) Session { - return Session{C.pn_session_head(c.pn, C.pn_state_t(s))} -} - -// Unique (per process) string identifier for a session, including connection identifier. -func (s Session) String() string { - return joinId(s.Connection(), fmt.Sprintf("%p", s.pn)) -} - -// Unique (per process) string identifier for a link, inlcuding session identifier. -func (l Link) String() string { - return joinId(l.Session(), l.Name()) -} - -// Error returns an error interface corresponding to Condition. -func (c Condition) Error() error { - if c.IsNil() { - return nil - } else { - return fmt.Errorf("%s: %s", c.Name(), c.Description()) - } -} - -// SetIfUnset sets name and description on a condition if it is not already set. -func (c Condition) SetIfUnset(name, description string) { - if !c.IsSet() { - c.SetName(name) - c.SetDescription(description) - } -} - -func (c Connection) Session() (Session, error) { - s := Session{C.pn_session(c.pn)} - if s.IsNil() { - return s, Connection(c).Error() - } - return s, nil -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
