http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go new file mode 100644 index 0000000..cc2cd0e --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go @@ -0,0 +1,34 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +/* +Package amqp encodes and decodes AMQP messages and data as Go types. + +It follows the standard 'encoding' libraries pattern. The mapping between AMQP +and Go types is described in the documentation of the Marshal and Unmarshal +functions. + +AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/> +*/ +package amqp + +// #cgo LDFLAGS: -lqpid-proton +import "C" + +// This file is just for the package comment.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go new file mode 100644 index 0000000..868dbf3 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go @@ -0,0 +1,66 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +import ( + "fmt" + "reflect" +) + +// Error is an AMQP error condition. It has a name and a description. +// It implements the Go error interface so can be returned as an error value. +// +// You can pass amqp.Error to methods that pass an error to a remote endpoint, +// this gives you full control over what the remote endpoint will see. +// +// You can also pass any Go error to such functions, the remote peer +// will see the equivalent of MakeError(error) +// +type Error struct{ Name, Description string } + +// Error implements the Go error interface for AMQP error errors. +func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) } + +// Errorf makes a Error with name and formatted description as per fmt.Sprintf +func Errorf(name, format string, arg ...interface{}) Error { + return Error{name, fmt.Sprintf(format, arg...)} +} + +// MakeError makes an AMQP error from a go error using the Go error type as the name +// and the err.Error() string as the description. +func MakeError(err error) Error { + return Error{reflect.TypeOf(err).Name(), err.Error()} +} + +var ( + InternalError = "amqp:internal-error" + NotFound = "amqp:not-found" + UnauthorizedAccess = "amqp:unauthorized-access" + DecodeError = "amqp:decode-error" + ResourceLimit = "amqp:resource-limit" + NotAllowed = "amqp:not-allowed" + InvalidField = "amqp:invalid-field" + NotImplemented = "amqp:not-implemented" + ResourceLocked = "amqp:resource-locked" + PreerrorFailed = "amqp:preerror-failed" + ResourceDeleted = "amqp:resource-deleted" + IllegalState = "amqp:illegal-state" + FrameSizeTooSmall = "amqp:frame-size-too-small" +) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop new file mode 120000 index 0000000..b2dd603 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop @@ -0,0 +1 @@ +../../../../../../../tests/interop \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go new file mode 100644 index 0000000..b36ef64 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go @@ -0,0 +1,381 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Test that conversion of Go type to/from AMQP is compatible with other +// bindings. +// +package amqp + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "reflect" + "strings" + "testing" +) + +func checkEqual(want interface{}, got interface{}) error { + if !reflect.DeepEqual(want, got) { + return fmt.Errorf("%#v != %#v", want, got) + } + return nil +} + +func getReader(name string) (r io.Reader) { + r, err := os.Open("interop/" + name + ".amqp") + if err != nil { + panic(fmt.Errorf("Can't open %#v: %v", name, err)) + } + return +} + +func remaining(d *Decoder) string { + remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader)) + return string(remainder) +} + +// checkDecode: want is the expected value, gotPtr is a pointer to a +// instance of the same type for Decode. +func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) { + + if err := d.Decode(gotPtr); err != nil { + t.Error("Decode failed", err) + return + } + got := reflect.ValueOf(gotPtr).Elem().Interface() + if err := checkEqual(want, got); err != nil { + t.Error("Decode bad value:", err) + return + } + + // Try round trip encoding + bytes, err := Marshal(want, nil) + if err != nil { + t.Error("Marshal failed", err) + return + } + n, err := Unmarshal(bytes, gotPtr) + if err != nil { + t.Error("Unmarshal failed", err) + return + } + if err := checkEqual(n, len(bytes)); err != nil { + t.Error("Bad unmarshal length", err) + return + } + got = reflect.ValueOf(gotPtr).Elem().Interface() + if err = checkEqual(want, got); err != nil { + t.Error("Bad unmarshal value", err) + return + } +} + +func TestUnmarshal(t *testing.T) { + bytes, err := ioutil.ReadAll(getReader("strings")) + if err != nil { + t.Error(err) + } + for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { + var got string + n, err := Unmarshal(bytes, &got) + if err != nil { + t.Error(err) + } + if want != got { + t.Errorf("%#v != %#v", want, got) + } + bytes = bytes[n:] + } +} + +func TestPrimitivesExact(t *testing.T) { + d := NewDecoder(getReader("primitives")) + // Decoding into exact types + var b bool + checkDecode(d, true, &b, t) + checkDecode(d, false, &b, t) + var u8 uint8 + checkDecode(d, uint8(42), &u8, t) + var u16 uint16 + checkDecode(d, uint16(42), &u16, t) + var i16 int16 + checkDecode(d, int16(-42), &i16, t) + var u32 uint32 + checkDecode(d, uint32(12345), &u32, t) + var i32 int32 + checkDecode(d, int32(-12345), &i32, t) + var u64 uint64 + checkDecode(d, uint64(12345), &u64, t) + var i64 int64 + checkDecode(d, int64(-12345), &i64, t) + var f32 float32 + checkDecode(d, float32(0.125), &f32, t) + var f64 float64 + checkDecode(d, float64(0.125), &f64, t) +} + +func TestPrimitivesCompatible(t *testing.T) { + d := NewDecoder(getReader("primitives")) + // Decoding into compatible types + var b bool + var i int + var u uint + var f float64 + checkDecode(d, true, &b, t) + checkDecode(d, false, &b, t) + checkDecode(d, uint(42), &u, t) + checkDecode(d, uint(42), &u, t) + checkDecode(d, -42, &i, t) + checkDecode(d, uint(12345), &u, t) + checkDecode(d, -12345, &i, t) + checkDecode(d, uint(12345), &u, t) + checkDecode(d, -12345, &i, t) + checkDecode(d, 0.125, &f, t) + checkDecode(d, 0.125, &f, t) +} + +// checkDecodeValue: want is the expected value, decode into a reflect.Value +func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) { + + var got, got2 interface{} + if err := d.Decode(&got); err != nil { + t.Error("Decode failed", err) + return + } + if err := checkEqual(want, got); err != nil { + t.Error(err) + return + } + // Try round trip encoding + bytes, err := Marshal(got, nil) + if err != nil { + t.Error(err) + return + } + n, err := Unmarshal(bytes, &got2) + if err != nil { + t.Error(err) + return + } + if err := checkEqual(n, len(bytes)); err != nil { + t.Error(err) + return + } + if err := checkEqual(want, got2); err != nil { + t.Error(err) + return + } +} + +func TestPrimitivesInterface(t *testing.T) { + d := NewDecoder(getReader("primitives")) + checkDecodeInterface(d, true, t) + checkDecodeInterface(d, false, t) + checkDecodeInterface(d, uint8(42), t) + checkDecodeInterface(d, uint16(42), t) + checkDecodeInterface(d, int16(-42), t) + checkDecodeInterface(d, uint32(12345), t) + checkDecodeInterface(d, int32(-12345), t) + checkDecodeInterface(d, uint64(12345), t) + checkDecodeInterface(d, int64(-12345), t) + checkDecodeInterface(d, float32(0.125), t) + checkDecodeInterface(d, float64(0.125), t) +} + +func TestStrings(t *testing.T) { + d := NewDecoder(getReader("strings")) + // Test decoding as plain Go strings + for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { + var got string + checkDecode(d, want, &got, t) + } + remains := remaining(d) + if remains != "" { + t.Errorf("leftover: %s", remains) + } + + // Test decoding as specific string types + d = NewDecoder(getReader("strings")) + var bytes []byte + var str, sym string + checkDecode(d, []byte("abc\000defg"), &bytes, t) + checkDecode(d, "abcdefg", &str, t) + checkDecode(d, "abcdefg", &sym, t) + checkDecode(d, make([]byte, 0), &bytes, t) + checkDecode(d, "", &str, t) + checkDecode(d, "", &sym, t) + remains = remaining(d) + if remains != "" { + t.Fatalf("leftover: %s", remains) + } + + // Test some error handling + d = NewDecoder(getReader("strings")) + var s string + err := d.Decode(s) + if err == nil { + t.Fatal("Expected error") + } + if !strings.Contains(err.Error(), "not a pointer") { + t.Error(err) + } + var i int + err = d.Decode(&i) + if !strings.Contains(err.Error(), "cannot unmarshal") { + t.Error(err) + } + _, err = Unmarshal([]byte{}, nil) + if !strings.Contains(err.Error(), "not enough data") { + t.Error(err) + } + _, err = Unmarshal([]byte("foobar"), nil) + if !strings.Contains(err.Error(), "invalid-argument") { + t.Error(err) + } +} + +func TestEncodeDecode(t *testing.T) { + type data struct { + s string + i int + u8 uint8 + b bool + f float32 + v interface{} + } + + in := data{"foo", 42, 9, true, 1.234, "thing"} + + buf := bytes.Buffer{} + e := NewEncoder(&buf) + if err := e.Encode(in.s); err != nil { + t.Error(err) + } + if err := e.Encode(in.i); err != nil { + t.Error(err) + } + if err := e.Encode(in.u8); err != nil { + t.Error(err) + } + if err := e.Encode(in.b); err != nil { + t.Error(err) + } + if err := e.Encode(in.f); err != nil { + t.Error(err) + } + if err := e.Encode(in.v); err != nil { + t.Error(err) + } + + var out data + d := NewDecoder(&buf) + if err := d.Decode(&out.s); err != nil { + t.Error(err) + } + if err := d.Decode(&out.i); err != nil { + t.Error(err) + } + if err := d.Decode(&out.u8); err != nil { + t.Error(err) + } + if err := d.Decode(&out.b); err != nil { + t.Error(err) + } + if err := d.Decode(&out.f); err != nil { + t.Error(err) + } + if err := d.Decode(&out.v); err != nil { + t.Error(err) + } + + if err := checkEqual(in, out); err != nil { + t.Error(err) + } +} + +func TestMap(t *testing.T) { + d := NewDecoder(getReader("maps")) + + // Generic map + var m Map + checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t) + + // Interface as map + var i interface{} + checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t) + + d = NewDecoder(getReader("maps")) + // Specific typed map + var m2 map[string]int + checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t) + + // Nested map + m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}} + bytes, err := Marshal(m, nil) + if err != nil { + t.Fatal(err) + } + _, err = Unmarshal(bytes, &i) + if err != nil { + t.Fatal(err) + } + if err = checkEqual(m, i); err != nil { + t.Fatal(err) + } +} + +func TestList(t *testing.T) { + d := NewDecoder(getReader("lists")) + var l List + checkDecode(d, List{int32(32), "foo", true}, &l, t) + checkDecode(d, List{}, &l, t) +} + +// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as +// as an AMQP string *inside* an AMQP binary?? Skip the test for now. +func TODO_TestMessage(t *testing.T) { + bytes, err := ioutil.ReadAll(getReader("message")) + if err != nil { + t.Fatal(err) + } + + m, err := DecodeMessage(bytes) + if err != nil { + t.Fatal(err) + } else { + if err := checkEqual(m.Body(), "hello"); err != nil { + t.Error(err) + } + } + + m2 := NewMessageWith("hello") + bytes2, err := m2.Encode(nil) + if err != nil { + t.Error(err) + } else { + if err = checkEqual(bytes, bytes2); err != nil { + t.Error(err) + } + } +} + +// TODO aconway 2015-03-13: finish the full interop test http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go new file mode 100644 index 0000000..e393c97 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go @@ -0,0 +1,250 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +import "C" + +import ( + "io" + "qpid.apache.org/proton/internal" + "reflect" + "unsafe" +) + +func dataError(prefix string, data *C.pn_data_t) error { + err := internal.PnError(unsafe.Pointer(C.pn_data_error(data))) + if err != nil { + err = internal.Errorf("%s: %s", prefix, err.(internal.Error)) + } + return err +} + +/* +Marshal encodes a Go value as AMQP data in buffer. +If buffer is nil, or is not large enough, a new buffer is created. + +Returns the buffer used for encoding with len() adjusted to the actual size of data. + +Go types are encoded as follows + + +-------------------------------------+--------------------------------------------+ + |Go type |AMQP type | + +-------------------------------------+--------------------------------------------+ + |bool |bool | + +-------------------------------------+--------------------------------------------+ + |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) | + +-------------------------------------+--------------------------------------------+ + |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) | + +-------------------------------------+--------------------------------------------+ + |float32, float64 |float, double. | + +-------------------------------------+--------------------------------------------+ + |string |string | + +-------------------------------------+--------------------------------------------+ + |[]byte, Binary |binary | + +-------------------------------------+--------------------------------------------+ + |Symbol |symbol | + +-------------------------------------+--------------------------------------------+ + |interface{} |the contained type | + +-------------------------------------+--------------------------------------------+ + |nil |null | + +-------------------------------------+--------------------------------------------+ + |map[K]T |map with K and T converted as above | + +-------------------------------------+--------------------------------------------+ + |Map |map, may have mixed types for keys, values | + +-------------------------------------+--------------------------------------------+ + |[]T |list with T converted as above | + +-------------------------------------+--------------------------------------------+ + |List |list, may have mixed types values | + +-------------------------------------+--------------------------------------------+ + +The following Go types cannot be marshaled: uintptr, function, interface, channel + +TODO + +Go types: array, slice, struct, complex64/128. + +AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies. + +Described types. + +*/ +func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { + defer doRecover(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) + marshal(v, data) + encode := func(buf []byte) ([]byte, error) { + n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf))) + switch { + case n == int(C.PN_OVERFLOW): + return buf, overflow + case n < 0: + return buf, dataError("marshal error", data) + default: + return buf[:n], nil + } + } + return encodeGrow(buffer, encode) +} + +const minEncode = 256 + +// overflow is returned when an encoding function can't fit data in the buffer. +var overflow = internal.Errorf("buffer too small") + +// encodeFn encodes into buffer[0:len(buffer)]. +// Returns buffer with length adjusted for data encoded. +// If buffer too small, returns overflow as error. +type encodeFn func(buffer []byte) ([]byte, error) + +// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer. +// Returns the final buffer. +func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) { + if buffer == nil || len(buffer) == 0 { + buffer = make([]byte, minEncode) + } + var err error + for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) { + buffer = make([]byte, 2*len(buffer)) + } + return buffer, err +} + +func marshal(v interface{}, data *C.pn_data_t) { + switch v := v.(type) { + case nil: + C.pn_data_put_null(data) + case bool: + C.pn_data_put_bool(data, C.bool(v)) + case int8: + C.pn_data_put_byte(data, C.int8_t(v)) + case int16: + C.pn_data_put_short(data, C.int16_t(v)) + case int32: + C.pn_data_put_int(data, C.int32_t(v)) + case int64: + C.pn_data_put_long(data, C.int64_t(v)) + case int: + if unsafe.Sizeof(0) == 8 { + C.pn_data_put_long(data, C.int64_t(v)) + } else { + C.pn_data_put_int(data, C.int32_t(v)) + } + case uint8: + C.pn_data_put_ubyte(data, C.uint8_t(v)) + case uint16: + C.pn_data_put_ushort(data, C.uint16_t(v)) + case uint32: + C.pn_data_put_uint(data, C.uint32_t(v)) + case uint64: + C.pn_data_put_ulong(data, C.uint64_t(v)) + case uint: + if unsafe.Sizeof(0) == 8 { + C.pn_data_put_ulong(data, C.uint64_t(v)) + } else { + C.pn_data_put_uint(data, C.uint32_t(v)) + } + case float32: + C.pn_data_put_float(data, C.float(v)) + case float64: + C.pn_data_put_double(data, C.double(v)) + case string: + C.pn_data_put_string(data, pnBytes([]byte(v))) + case []byte: + C.pn_data_put_binary(data, pnBytes(v)) + case Binary: + C.pn_data_put_binary(data, pnBytes([]byte(v))) + case Symbol: + C.pn_data_put_symbol(data, pnBytes([]byte(v))) + case Map: // Special map type + C.pn_data_put_map(data) + C.pn_data_enter(data) + for key, val := range v { + marshal(key, data) + marshal(val, data) + } + C.pn_data_exit(data) + default: + switch reflect.TypeOf(v).Kind() { + case reflect.Map: + putMap(data, v) + case reflect.Slice: + putList(data, v) + default: + panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) + } + } + err := dataError("marshal", data) + if err != nil { + panic(err) + } + return +} + +func clearMarshal(v interface{}, data *C.pn_data_t) { + C.pn_data_clear(data) + marshal(v, data) +} + +func putMap(data *C.pn_data_t, v interface{}) { + mapValue := reflect.ValueOf(v) + C.pn_data_put_map(data) + C.pn_data_enter(data) + for _, key := range mapValue.MapKeys() { + marshal(key.Interface(), data) + marshal(mapValue.MapIndex(key).Interface(), data) + } + C.pn_data_exit(data) +} + +func putList(data *C.pn_data_t, v interface{}) { + listValue := reflect.ValueOf(v) + C.pn_data_put_list(data) + C.pn_data_enter(data) + for i := 0; i < listValue.Len(); i++ { + marshal(listValue.Index(i).Interface(), data) + } + C.pn_data_exit(data) +} + +// Encoder encodes AMQP values to an io.Writer +type Encoder struct { + writer io.Writer + buffer []byte +} + +// New encoder returns a new encoder that writes to w. +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w, make([]byte, minEncode)} +} + +func (e *Encoder) Encode(v interface{}) (err error) { + e.buffer, err = Marshal(v, e.buffer) + if err == nil { + e.writer.Write(e.buffer) + } + return err +} + +func replace(data *C.pn_data_t, v interface{}) { + C.pn_data_clear(data) + marshal(v, data) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go new file mode 100644 index 0000000..20cfa02 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go @@ -0,0 +1,347 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/types.h> +// #include <proton/message.h> +// #include <proton/codec.h> +// #include <stdlib.h> +// +// /* Helper for setting message string fields */ +// typedef int (*set_fn)(pn_message_t*, const char*); +// int msg_set_str(pn_message_t* m, char* s, set_fn set) { +// int result = set(m, s); +// free(s); +// return result; +// } +// +import "C" + +import ( + "qpid.apache.org/proton/internal" + "runtime" + "time" + "unsafe" +) + +// Message is the interface to an AMQP message. +type Message interface { + // Durable indicates that any parties taking responsibility + // for the message must durably store the content. + Durable() bool + SetDurable(bool) + + // Priority impacts ordering guarantees. Within a + // given ordered context, higher priority messages may jump ahead of + // lower priority messages. + Priority() uint8 + SetPriority(uint8) + + // TTL or Time To Live, a message it may be dropped after this duration + TTL() time.Duration + SetTTL(time.Duration) + + // FirstAcquirer indicates + // that the recipient of the message is the first recipient to acquire + // the message, i.e. there have been no failed delivery attempts to + // other acquirers. Note that this does not mean the message has not + // been delivered to, but not acquired, by other recipients. + FirstAcquirer() bool + SetFirstAcquirer(bool) + + // DeliveryCount tracks how many attempts have been made to + // delivery a message. + DeliveryCount() uint32 + SetDeliveryCount(uint32) + + // MessageId provides a unique identifier for a message. + // it can be an a string, an unsigned long, a uuid or a + // binary value. + MessageId() interface{} + SetMessageId(interface{}) + + UserId() string + SetUserId(string) + + Address() string + SetAddress(string) + + Subject() string + SetSubject(string) + + ReplyTo() string + SetReplyTo(string) + + // CorrelationId is set on correlated request and response messages. It can be + // an a string, an unsigned long, a uuid or a binary value. + CorrelationId() interface{} + SetCorrelationId(interface{}) + + ContentType() string + SetContentType(string) + + ContentEncoding() string + SetContentEncoding(string) + + // ExpiryTime indicates an absoulte time when the message may be dropped. + // A Zero time (i.e. t.isZero() == true) indicates a message never expires. + ExpiryTime() time.Time + SetExpiryTime(time.Time) + + CreationTime() time.Time + SetCreationTime(time.Time) + + GroupId() string + SetGroupId(string) + + GroupSequence() int32 + SetGroupSequence(int32) + + ReplyToGroupId() string + SetReplyToGroupId(string) + + // Instructions - AMQP delivery instructions. + Instructions() map[string]interface{} + SetInstructions(v map[string]interface{}) + + // Annotations - AMQP annotations. + Annotations() map[string]interface{} + SetAnnotations(v map[string]interface{}) + + // Properties - Application properties. + Properties() map[string]interface{} + SetProperties(v map[string]interface{}) + + // Inferred indicates how the message content + // is encoded into AMQP sections. If inferred is true then binary and + // list values in the body of the message will be encoded as AMQP DATA + // and AMQP SEQUENCE sections, respectively. If inferred is false, + // then all values in the body of the message will be encoded as AMQP + // VALUE sections regardless of their type. + Inferred() bool + SetInferred(bool) + + // Marshal a Go value into the message body. See amqp.Marshal() for details. + Marshal(interface{}) + + // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details. + Unmarshal(interface{}) + + // Body value resulting from the default unmarshalling of message body as interface{} + Body() interface{} + + // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough + // the message is encoded into it, otherwise a new buffer is created. + // Returns the buffer containing the message. + Encode(buffer []byte) ([]byte, error) + + // Decode data into this message. Overwrites an existing message content. + Decode(buffer []byte) error + + // Clear the message contents. + Clear() + + // Copy the contents of another message to this one. + Copy(m Message) error +} + +type message struct{ pn *C.pn_message_t } + +func freeMessage(m *message) { + C.pn_message_free(m.pn) + m.pn = nil +} + +// NewMessage creates a new message instance. +func NewMessage() Message { + m := &message{C.pn_message()} + runtime.SetFinalizer(m, freeMessage) + return m +} + +// NewMessageWith creates a message with value as the body. Equivalent to +// m := NewMessage(); m.Marshal(body) +func NewMessageWith(value interface{}) Message { + m := NewMessage() + m.Marshal(value) + return m +} + +func (m *message) Clear() { C.pn_message_clear(m.pn) } + +func (m *message) Copy(x Message) error { + if data, err := x.Encode(nil); err == nil { + return m.Decode(data) + } else { + return err + } +} + +// ==== message get functions + +func rewindGet(data *C.pn_data_t) (v interface{}) { + C.pn_data_rewind(data) + C.pn_data_next(data) + unmarshal(&v, data) + return v +} + +func rewindMap(data *C.pn_data_t) (v map[string]interface{}) { + C.pn_data_rewind(data) + C.pn_data_next(data) + unmarshal(&v, data) + return v +} + +func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) } +func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) } +func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) } +func (m *message) TTL() time.Duration { + return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond +} +func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) } +func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) } +func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) } +func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) } +func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) } +func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) } +func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) } +func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) } +func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) } +func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) } + +func (m *message) ExpiryTime() time.Time { + return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn)))) +} +func (m *message) CreationTime() time.Time { + return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn))) +} +func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) } +func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) } +func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) } + +func (m *message) Instructions() map[string]interface{} { + return rewindMap(C.pn_message_instructions(m.pn)) +} +func (m *message) Annotations() map[string]interface{} { + return rewindMap(C.pn_message_annotations(m.pn)) +} +func (m *message) Properties() map[string]interface{} { + return rewindMap(C.pn_message_properties(m.pn)) +} + +// ==== message set methods + +func setData(v interface{}, data *C.pn_data_t) { + C.pn_data_clear(data) + marshal(v, data) +} + +func dataString(data *C.pn_data_t) string { + str := C.pn_string(C.CString("")) + defer C.pn_free(unsafe.Pointer(str)) + C.pn_inspect(unsafe.Pointer(data), str) + return C.GoString(C.pn_string_get(str)) +} + +func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(m.Inferred())) } +func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) } +func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) } +func (m *message) SetTTL(d time.Duration) { + C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond)) +} +func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) } +func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) } +func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) } +func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) } +func (m *message) SetAddress(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address)) +} +func (m *message) SetSubject(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject)) +} +func (m *message) SetReplyTo(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to)) +} +func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) } +func (m *message) SetContentType(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type)) +} +func (m *message) SetContentEncoding(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding)) +} +func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) } +func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) } +func (m *message) SetGroupId(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id)) +} +func (m *message) SetGroupSequence(s int32) { + C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s)) +} +func (m *message) SetReplyToGroupId(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id)) +} + +func (m *message) SetInstructions(v map[string]interface{}) { + setData(v, C.pn_message_instructions(m.pn)) +} +func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) } +func (m *message) SetProperties(v map[string]interface{}) { setData(v, C.pn_message_properties(m.pn)) } + +// Marshal/Unmarshal body +func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) } +func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) } +func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return } + +func (m *message) Decode(data []byte) error { + m.Clear() + if len(data) == 0 { + return internal.Errorf("empty buffer for decode") + } + if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 { + return internal.Errorf("decoding message: %s", + internal.PnError(unsafe.Pointer(C.pn_message_error(m.pn)))) + } + return nil +} + +func DecodeMessage(data []byte) (m Message, err error) { + m = NewMessage() + err = m.Decode(data) + return +} + +func (m *message) Encode(buffer []byte) ([]byte, error) { + encode := func(buf []byte) ([]byte, error) { + len := cLen(buf) + result := C.pn_message_encode(m.pn, cPtr(buf), &len) + switch { + case result == C.PN_OVERFLOW: + return buf, overflow + case result < 0: + return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result)) + default: + return buf[:len], nil + } + } + return encodeGrow(buffer, encode) +} + +// TODO aconway 2015-09-14: Multi-section messages. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go new file mode 100644 index 0000000..7a6e5a8 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go @@ -0,0 +1,166 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +import ( + "testing" + "time" +) + +func roundTrip(m Message) error { + buffer, err := m.Encode(nil) + if err != nil { + return err + } + m2, err := DecodeMessage(buffer) + if err != nil { + return err + } + return checkEqual(m, m2) +} + +func TestDefaultMessage(t *testing.T) { + m := NewMessage() + // Check defaults + for _, data := range [][]interface{}{ + {m.Inferred(), false}, + {m.Durable(), false}, + {m.Priority(), uint8(4)}, + {m.TTL(), time.Duration(0)}, + {m.UserId(), ""}, + {m.Address(), ""}, + {m.Subject(), ""}, + {m.ReplyTo(), ""}, + {m.ContentType(), ""}, + {m.ContentEncoding(), ""}, + {m.GroupId(), ""}, + {m.GroupSequence(), int32(0)}, + {m.ReplyToGroupId(), ""}, + {m.MessageId(), nil}, + {m.CorrelationId(), nil}, + {m.Instructions(), map[string]interface{}{}}, + {m.Annotations(), map[string]interface{}{}}, + {m.Properties(), map[string]interface{}{}}, + {m.Body(), nil}, + } { + if err := checkEqual(data[0], data[1]); err != nil { + t.Error(err) + } + } + if err := roundTrip(m); err != nil { + t.Error(err) + } +} + +func TestMessageRoundTrip(t *testing.T) { + m := NewMessage() + m.SetInferred(false) + m.SetDurable(true) + m.SetPriority(42) + m.SetTTL(0) + m.SetUserId("user") + m.SetAddress("address") + m.SetSubject("subject") + m.SetReplyTo("replyto") + m.SetContentType("content") + m.SetContentEncoding("encoding") + m.SetGroupId("group") + m.SetGroupSequence(42) + m.SetReplyToGroupId("replytogroup") + m.SetMessageId("id") + m.SetCorrelationId("correlation") + m.SetInstructions(map[string]interface{}{"instructions": "foo"}) + m.SetAnnotations(map[string]interface{}{"annotations": "foo"}) + m.SetProperties(map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}) + m.Marshal("hello") + + for _, data := range [][]interface{}{ + {m.Inferred(), false}, + {m.Durable(), true}, + {m.Priority(), uint8(42)}, + {m.TTL(), time.Duration(0)}, + {m.UserId(), "user"}, + {m.Address(), "address"}, + {m.Subject(), "subject"}, + {m.ReplyTo(), "replyto"}, + {m.ContentType(), "content"}, + {m.ContentEncoding(), "encoding"}, + {m.GroupId(), "group"}, + {m.GroupSequence(), int32(42)}, + {m.ReplyToGroupId(), "replytogroup"}, + {m.MessageId(), "id"}, + {m.CorrelationId(), "correlation"}, + {m.Instructions(), map[string]interface{}{"instructions": "foo"}}, + {m.Annotations(), map[string]interface{}{"annotations": "foo"}}, + {m.Properties(), map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}}, + {m.Body(), "hello"}, + } { + if err := checkEqual(data[0], data[1]); err != nil { + t.Error(err) + } + } + if err := roundTrip(m); err != nil { + t.Error(err) + } +} + +func TestMessageBodyTypes(t *testing.T) { + var s string + var body interface{} + var i int64 + + m := NewMessageWith(int64(42)) + m.Unmarshal(&body) + m.Unmarshal(&i) + if err := checkEqual(body.(int64), int64(42)); err != nil { + t.Error(err) + } + if err := checkEqual(i, int64(42)); err != nil { + t.Error(err) + } + + m = NewMessageWith("hello") + m.Unmarshal(&s) + m.Unmarshal(&body) + if err := checkEqual(s, "hello"); err != nil { + t.Error(err) + } + if err := checkEqual(body.(string), "hello"); err != nil { + t.Error(err) + } + if err := roundTrip(m); err != nil { + t.Error(err) + } + + m = NewMessageWith(Binary("bin")) + m.Unmarshal(&s) + m.Unmarshal(&body) + if err := checkEqual(body.(Binary), Binary("bin")); err != nil { + t.Error(err) + } + if err := checkEqual(s, "bin"); err != nil { + t.Error(err) + } + if err := roundTrip(m); err != nil { + t.Error(err) + } + + // TODO aconway 2015-09-08: array etc. +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go new file mode 100644 index 0000000..131c974 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go @@ -0,0 +1,198 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +import "C" + +import ( + "bytes" + "fmt" + "reflect" + "time" + "unsafe" +) + +type Type C.pn_type_t + +func (t Type) String() string { + switch C.pn_type_t(t) { + case C.PN_NULL: + return "null" + case C.PN_BOOL: + return "bool" + case C.PN_UBYTE: + return "ubyte" + case C.PN_BYTE: + return "byte" + case C.PN_USHORT: + return "ushort" + case C.PN_SHORT: + return "short" + case C.PN_CHAR: + return "char" + case C.PN_UINT: + return "uint" + case C.PN_INT: + return "int" + case C.PN_ULONG: + return "ulong" + case C.PN_LONG: + return "long" + case C.PN_TIMESTAMP: + return "timestamp" + case C.PN_FLOAT: + return "float" + case C.PN_DOUBLE: + return "double" + case C.PN_DECIMAL32: + return "decimal32" + case C.PN_DECIMAL64: + return "decimal64" + case C.PN_DECIMAL128: + return "decimal128" + case C.PN_UUID: + return "uuid" + case C.PN_BINARY: + return "binary" + case C.PN_STRING: + return "string" + case C.PN_SYMBOL: + return "symbol" + case C.PN_DESCRIBED: + return "described" + case C.PN_ARRAY: + return "array" + case C.PN_LIST: + return "list" + case C.PN_MAP: + return "map" + case C.PN_INVALID: + return "no-data" + default: + return fmt.Sprintf("unknown-type(%d)", t) + } +} + +// Go types +var ( + bytesType = reflect.TypeOf([]byte{}) + valueType = reflect.TypeOf(reflect.Value{}) +) + +// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys. + +// Map is a generic map that can have mixed key and value types and so can represent any AMQP map +type Map map[interface{}]interface{} + +// List is a generic list that can hold mixed values and can represent any AMQP list. +// +type List []interface{} + +// Symbol is a string that is encoded as an AMQP symbol +type Symbol string + +func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) } + +// Binary is a string that is encoded as an AMQP binary. +// It is a string rather than a byte[] because byte[] is not hashable and can't be used as +// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte +type Binary string + +func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) } + +// GoString for Map prints values with their types, useful for debugging. +func (m Map) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", m) + i := len(m) + for k, v := range m { + fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v) + i-- + if i > 0 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// GoString for List prints values with their types, useful for debugging. +func (l List) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", l) + for i := 0; i < len(l); i++ { + fmt.Fprintf(out, "%T(%#v)", l[i], l[i]) + if i == len(l)-1 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// pnTime converts Go time.Time to Proton millisecond Unix time. +func pnTime(t time.Time) C.pn_timestamp_t { + secs := t.Unix() + // Note: sub-second accuracy is not guaraunteed if the Unix time in + // nanoseconds cannot be represented by an int64 (sometime around year 2260) + msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) + return C.pn_timestamp_t(secs*1000 + msecs) +} + +// goTime converts a pn_timestamp_t to a Go time.Time. +func goTime(t C.pn_timestamp_t) time.Time { + secs := int64(t) / 1000 + nsecs := (int64(t) % 1000) * int64(time.Millisecond) + return time.Unix(secs, nsecs) +} + +func goBytes(cBytes C.pn_bytes_t) (bytes []byte) { + if cBytes.start != nil { + bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size)) + } + return +} + +func goString(cBytes C.pn_bytes_t) (str string) { + if cBytes.start != nil { + str = C.GoStringN(cBytes.start, C.int(cBytes.size)) + } + return +} + +func pnBytes(b []byte) C.pn_bytes_t { + if len(b) == 0 { + return C.pn_bytes_t{0, nil} + } else { + return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))} + } +} + +func cPtr(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} + +func cLen(b []byte) C.size_t { + return C.size_t(len(b)) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/unmarshal.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/unmarshal.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/unmarshal.go new file mode 100644 index 0000000..d645273 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/unmarshal.go @@ -0,0 +1,556 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +oor more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +import "C" + +import ( + "bytes" + "fmt" + "io" + "qpid.apache.org/proton/internal" + "reflect" + "unsafe" +) + +const minDecode = 1024 + +// Error returned if AMQP data cannot be unmarshaled as the desired Go type. +type UnmarshalError struct { + // The name of the AMQP type. + AMQPType string + // The Go type. + GoType reflect.Type +} + +func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError { + return &UnmarshalError{Type(pnType).String(), reflect.TypeOf(v)} +} + +func (e UnmarshalError) Error() string { + if e.GoType.Kind() != reflect.Ptr { + return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType) + } else { + return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) + } +} + +func doRecover(err *error) { + r := recover() + switch r := r.(type) { + case nil: + case *UnmarshalError, internal.Error: + *err = r.(error) + default: + panic(r) + } +} + +// +// Decoding from a pn_data_t +// +// NOTE: we use panic() to signal a decoding error, simplifies decoding logic. +// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode. +// + +// Decoder decodes AMQP values from an io.Reader. +// +type Decoder struct { + reader io.Reader + buffer bytes.Buffer +} + +// NewDecoder returns a new decoder that reads from r. +// +// The decoder has it's own buffer and may read more data than required for the +// AMQP values requested. Use Buffered to see if there is data left in the +// buffer. +// +func NewDecoder(r io.Reader) *Decoder { + return &Decoder{r, bytes.Buffer{}} +} + +// Buffered returns a reader of the data remaining in the Decoder's buffer. The +// reader is valid until the next call to Decode. +// +func (d *Decoder) Buffered() io.Reader { + return bytes.NewReader(d.buffer.Bytes()) +} + +// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v. +// +// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value. +// +func (d *Decoder) Decode(v interface{}) (err error) { + defer doRecover(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) + var n int + for n == 0 && err == nil { + n = decode(data, d.buffer.Bytes()) + if n == 0 { // n == 0 means not enough data, read more + err = d.more() + } else { + unmarshal(v, data) + } + } + d.buffer.Next(n) + return +} + +/* +Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v. +Types are converted as follows: + + +---------------------------+----------------------------------------------------------------------+ + |To Go types |From AMQP types | + +===========================+======================================================================+ + |bool |bool | + +---------------------------+----------------------------------------------------------------------+ + |int, int8, int16, |Equivalent or smaller signed integer type: byte, short, int, long. | + |int32, int64 | | + +---------------------------+----------------------------------------------------------------------+ + |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: ubyte, ushort, uint, | + |uint32, uint64 types |ulong | + +---------------------------+----------------------------------------------------------------------+ + |float32, float64 |Equivalent or smaller float or double. | + +---------------------------+----------------------------------------------------------------------+ + |string, []byte |string, symbol or binary. | + +---------------------------+----------------------------------------------------------------------+ + |Symbol |symbol | + +---------------------------+----------------------------------------------------------------------+ + |map[K]T |map, provided all keys and values can unmarshal to types K, T | + +---------------------------+----------------------------------------------------------------------+ + |Map |map, any AMQP map | + +---------------------------+----------------------------------------------------------------------+ + |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: | + | +------------------------+---------------------------------------------+ + | |AMQP Type |Go Type in interface{} | + | +========================+=============================================+ + | |bool |bool | + | +------------------------+---------------------------------------------+ + | |byte,short,int,long |int8,int16,int32,int64 | + | +------------------------+---------------------------------------------+ + | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 | + | +------------------------+---------------------------------------------+ + | |float, double |float32, float64 | + | +------------------------+---------------------------------------------+ + | |string |string | + | +------------------------+---------------------------------------------+ + | |symbol |Symbol | + | +------------------------+---------------------------------------------+ + | |binary |Binary | + | +------------------------+---------------------------------------------+ + | |nulll |nil | + | +------------------------+---------------------------------------------+ + | |map |Map | + | +------------------------+---------------------------------------------+ + | |list |List | + +---------------------------+------------------------+---------------------------------------------+ + +The following Go types cannot be unmarshaled: uintptr, function, interface, channel. + +TODO + +Go types: array, struct. + +AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies. + +AMQP maps with mixed/unhashable key types need an alternate representation. + +Described types. +*/ +func Unmarshal(bytes []byte, v interface{}) (n int, err error) { + defer doRecover(&err) + + data := C.pn_data(0) + defer C.pn_data_free(data) + n = decode(data, bytes) + if n == 0 { + err = internal.Errorf("not enough data") + } else { + unmarshal(v, data) + } + return +} + +// more reads more data when we can't parse a complete AMQP type +func (d *Decoder) more() error { + var readSize int64 = minDecode + if int64(d.buffer.Len()) > readSize { // Grow by doubling + readSize = int64(d.buffer.Len()) + } + var n int64 + n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize)) + if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0 + err = io.EOF + } + return err +} + +// Unmarshal from data into value pointed at by v. +func unmarshal(v interface{}, data *C.pn_data_t) { + pnType := C.pn_data_type(data) + switch v := v.(type) { + case *bool: + switch pnType { + case C.PN_BOOL: + *v = bool(C.pn_data_get_bool(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int8: + switch pnType { + case C.PN_CHAR: + *v = int8(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int8(C.pn_data_get_byte(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint8: + switch pnType { + case C.PN_CHAR: + *v = uint8(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint8(C.pn_data_get_ubyte(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int16: + switch pnType { + case C.PN_CHAR: + *v = int16(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int16(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int16(C.pn_data_get_short(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint16: + switch pnType { + case C.PN_CHAR: + *v = uint16(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint16(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint16(C.pn_data_get_ushort(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *int32: + switch pnType { + case C.PN_CHAR: + *v = int32(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int32(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int32(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int32(C.pn_data_get_int(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + case *uint32: + switch pnType { + case C.PN_CHAR: + *v = uint32(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint32(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint32(C.pn_data_get_ushort(data)) + case C.PN_UINT: + *v = uint32(C.pn_data_get_uint(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *int64: + switch pnType { + case C.PN_CHAR: + *v = int64(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int64(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int64(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int64(C.pn_data_get_int(data)) + case C.PN_LONG: + *v = int64(C.pn_data_get_long(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *uint64: + switch pnType { + case C.PN_CHAR: + *v = uint64(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint64(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint64(C.pn_data_get_ushort(data)) + case C.PN_ULONG: + *v = uint64(C.pn_data_get_ulong(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *int: + switch pnType { + case C.PN_CHAR: + *v = int(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int(C.pn_data_get_int(data)) + case C.PN_LONG: + if unsafe.Sizeof(0) == 8 { + *v = int(C.pn_data_get_long(data)) + } else { + panic(newUnmarshalError(pnType, v)) + } + default: + panic(newUnmarshalError(pnType, v)) + } + + case *uint: + switch pnType { + case C.PN_CHAR: + *v = uint(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint(C.pn_data_get_ushort(data)) + case C.PN_UINT: + *v = uint(C.pn_data_get_uint(data)) + case C.PN_ULONG: + if unsafe.Sizeof(0) == 8 { + *v = uint(C.pn_data_get_ulong(data)) + } else { + panic(newUnmarshalError(pnType, v)) + } + default: + panic(newUnmarshalError(pnType, v)) + } + + case *float32: + switch pnType { + case C.PN_FLOAT: + *v = float32(C.pn_data_get_float(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *float64: + switch pnType { + case C.PN_FLOAT: + *v = float64(C.pn_data_get_float(data)) + case C.PN_DOUBLE: + *v = float64(C.pn_data_get_double(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *string: + switch pnType { + case C.PN_STRING: + *v = goString(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = goString(C.pn_data_get_symbol(data)) + case C.PN_BINARY: + *v = goString(C.pn_data_get_binary(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *[]byte: + switch pnType { + case C.PN_STRING: + *v = goBytes(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = goBytes(C.pn_data_get_symbol(data)) + case C.PN_BINARY: + *v = goBytes(C.pn_data_get_binary(data)) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *Binary: + switch pnType { + case C.PN_BINARY: + *v = Binary(goBytes(C.pn_data_get_binary(data))) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *Symbol: + switch pnType { + case C.PN_SYMBOL: + *v = Symbol(goBytes(C.pn_data_get_symbol(data))) + default: + panic(newUnmarshalError(pnType, v)) + } + + case *interface{}: + getInterface(data, v) + + default: + if reflect.TypeOf(v).Kind() != reflect.Ptr { + panic(newUnmarshalError(pnType, v)) + } + switch reflect.TypeOf(v).Elem().Kind() { + case reflect.Map: + getMap(data, v) + case reflect.Slice: + getList(data, v) + default: + panic(newUnmarshalError(pnType, v)) + } + } + err := dataError("unmarshaling", data) + if err != nil { + panic(err) + } + return +} + +func rewindUnmarshal(v interface{}, data *C.pn_data_t) { + C.pn_data_rewind(data) + C.pn_data_next(data) + unmarshal(v, data) +} + +// Getting into an interface is driven completely by the AMQP type, since the interface{} +// target is type-neutral. +func getInterface(data *C.pn_data_t, v *interface{}) { + pnType := C.pn_data_type(data) + switch pnType { + case C.PN_NULL, C.PN_INVALID: // No data. + *v = nil + case C.PN_BOOL: + *v = bool(C.pn_data_get_bool(data)) + case C.PN_UBYTE: + *v = uint8(C.pn_data_get_ubyte(data)) + case C.PN_BYTE: + *v = int8(C.pn_data_get_byte(data)) + case C.PN_USHORT: + *v = uint16(C.pn_data_get_ushort(data)) + case C.PN_SHORT: + *v = int16(C.pn_data_get_short(data)) + case C.PN_UINT: + *v = uint32(C.pn_data_get_uint(data)) + case C.PN_INT: + *v = int32(C.pn_data_get_int(data)) + case C.PN_CHAR: + *v = uint8(C.pn_data_get_char(data)) + case C.PN_ULONG: + *v = uint64(C.pn_data_get_ulong(data)) + case C.PN_LONG: + *v = int64(C.pn_data_get_long(data)) + case C.PN_FLOAT: + *v = float32(C.pn_data_get_float(data)) + case C.PN_DOUBLE: + *v = float64(C.pn_data_get_double(data)) + case C.PN_BINARY: + *v = Binary(goBytes(C.pn_data_get_binary(data))) + case C.PN_STRING: + *v = goString(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = Symbol(goString(C.pn_data_get_symbol(data))) + case C.PN_MAP: + m := make(Map) + unmarshal(&m, data) + *v = m + case C.PN_LIST: + l := make(List, 0) + unmarshal(&l, data) + *v = l + default: + panic(newUnmarshalError(pnType, v)) + } +} + +// get into map pointed at by v +func getMap(data *C.pn_data_t, v interface{}) { + mapValue := reflect.ValueOf(v).Elem() + mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map + switch pnType := C.pn_data_type(data); pnType { + case C.PN_MAP: + count := int(C.pn_data_get_map(data)) + if bool(C.pn_data_enter(data)) { + defer C.pn_data_exit(data) + for i := 0; i < count/2; i++ { + if bool(C.pn_data_next(data)) { + key := reflect.New(mapValue.Type().Key()) + unmarshal(key.Interface(), data) + if bool(C.pn_data_next(data)) { + val := reflect.New(mapValue.Type().Elem()) + unmarshal(val.Interface(), data) + mapValue.SetMapIndex(key.Elem(), val.Elem()) + } + } + } + } + case C.PN_INVALID: // Leave the map empty + default: + panic(newUnmarshalError(pnType, v)) + } +} + +func getList(data *C.pn_data_t, v interface{}) { + pnType := C.pn_data_type(data) + if pnType != C.PN_LIST { + panic(newUnmarshalError(pnType, v)) + } + count := int(C.pn_data_get_list(data)) + listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count) + if bool(C.pn_data_enter(data)) { + for i := 0; i < count; i++ { + if bool(C.pn_data_next(data)) { + val := reflect.New(listValue.Type().Elem()) + unmarshal(val.Interface(), data) + listValue.Index(i).Set(val.Elem()) + } + } + C.pn_data_exit(data) + } + reflect.ValueOf(v).Elem().Set(listValue) +} + +// decode from bytes. +// Return bytes decoded or 0 if we could not decode a complete object. +// +func decode(data *C.pn_data_t, bytes []byte) int { + if len(bytes) == 0 { + return 0 + } + n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes))) + if n == int(C.PN_UNDERFLOW) { + C.pn_error_clear(C.pn_data_error(data)) + return 0 + } else if n <= 0 { + panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n))) + } + return n +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/url.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/url.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/url.go new file mode 100644 index 0000000..7a4ef13 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/url.go @@ -0,0 +1,96 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +/* +#include <stdlib.h> +#include <string.h> +#include <proton/url.h> + +// Helper function for setting URL fields. +typedef void (*setter_fn)(pn_url_t* url, const char* value); +inline void set(pn_url_t *url, setter_fn s, const char* value) { + s(url, value); +} +*/ +import "C" + +import ( + "net" + "net/url" + "qpid.apache.org/proton/internal" + "unsafe" +) + +const ( + amqp string = "amqp" + amqps = "amqps" +) + +// ParseUrl parses an AMQP URL string and returns a net/url.Url. +// +// It is more forgiving than net/url.Parse and allows most of the parts of the +// URL to be missing, assuming AMQP defaults. +// +func ParseURL(s string) (u *url.URL, err error) { + cstr := C.CString(s) + defer C.free(unsafe.Pointer(cstr)) + pnUrl := C.pn_url_parse(cstr) + if pnUrl == nil { + return nil, internal.Errorf("bad URL %#v", s) + } + defer C.pn_url_free(pnUrl) + + scheme := C.GoString(C.pn_url_get_scheme(pnUrl)) + username := C.GoString(C.pn_url_get_username(pnUrl)) + password := C.GoString(C.pn_url_get_password(pnUrl)) + host := C.GoString(C.pn_url_get_host(pnUrl)) + port := C.GoString(C.pn_url_get_port(pnUrl)) + path := C.GoString(C.pn_url_get_path(pnUrl)) + + if err != nil { + return nil, internal.Errorf("bad URL %#v: %s", s, err) + } + if scheme == "" { + scheme = amqp + } + if port == "" { + if scheme == amqps { + port = amqps + } else { + port = amqp + } + } + var user *url.Userinfo + if password != "" { + user = url.UserPassword(username, password) + } else if username != "" { + user = url.User(username) + } + + u = &url.URL{ + Scheme: scheme, + User: user, + Host: net.JoinHostPort(host, port), + Path: path, + } + + return u, nil +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/url_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/url_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/url_test.go new file mode 100644 index 0000000..f80f1c4 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/url_test.go @@ -0,0 +1,51 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +import ( + "fmt" +) + +func ExampleParseURL() { + for _, s := range []string{ + "amqp://username:password@host:1234/path", + "host:1234", + "host", + ":1234", + "host/path", + "amqps://host", + "", + } { + u, err := ParseURL(s) + if err != nil { + fmt.Println(err) + } else { + fmt.Println(u) + } + } + // Output: + // amqp://username:password@host:1234/path + // amqp://host:1234 + // amqp://host:amqp + // amqp://:1234 + // amqp://host:amqp/path + // amqps://host:amqps + // proton: bad URL "" +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go new file mode 100644 index 0000000..9e82760 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go @@ -0,0 +1,167 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package concurrent + +// #include <proton/disposition.h> +import "C" + +import ( + "net" + "qpid.apache.org/proton" + "qpid.apache.org/proton/internal" + "sync" +) + +// Connection is an AMQP connection, created by a Container. +type Connection interface { + Endpoint + + // Server puts the connection in server mode, must be called before Open(). + // + // A server connection will do protocol negotiation to accept a incoming AMQP + // connection. Normally you would call this for a connection accepted by + // net.Listener.Accept() + // + Server() + + // Listen enables endpoints opened by the remote peer to be accepted by calling Accept(). + // Must be called before Open(). + Listen() + + // NewSession creates a new local session, you must call Session.Open() + // to open it with the remote peer. + NewSession() (s Session, err error) + + // Accept returns the next Endpoint (Session, Sender or Receiver) opened by + // the remote peer. It returns (nil, error) if the connection closes. + // + // You must call Endpoint.Open() to complete opening the returned Endpoint or + // Endpoint.Close(error) to reject it. You can set endpoint properties before + // calling Open() + // + // You can use a type switch or type conversion to test which kind of Endpoint + // has been returned. + // + // You must call Connection.Listen() before Connection.Open() to enable Accept. + // + // The connection buffers endpoints until you call Accept() so normally you + // should have a dedicated goroutine calling Accept() in a loop to process it + // rapidly. + // + Accept() (Endpoint, error) + + // Container for the connection. + Container() Container + + // Disconnect the connection abrubtly. + Disconnect(error) +} + +type connection struct { + endpoint + listenOnce sync.Once + + // Set before Open() + container *container + conn net.Conn + incoming *internal.FlexChannel + + // Set by Open() + handler *handler + engine *proton.Engine + err internal.FirstError + eConnection proton.Connection +} + +func newConnection(conn net.Conn, cont *container) (*connection, error) { + c := &connection{container: cont, conn: conn} + c.handler = newHandler(c) + var err error + c.engine, err = proton.NewEngine(c.conn, c.handler.delegator) + if err != nil { + return nil, err + } + c.str = c.engine.String() + c.eConnection = c.engine.Connection() + return c, nil +} + +func (c *connection) Server() { c.engine.Server() } + +func (c *connection) Listen() { + c.listenOnce.Do(func() { c.incoming = internal.NewFlexChannel(-1) }) +} + +func (c *connection) Open() error { + go c.engine.Run() + return nil +} + +func (c *connection) Close(err error) { + c.engine.Close(err) + c.setError(c.engine.Error()) // Will be io.EOF on close OK + if c.incoming != nil { + close(c.incoming.In) + } +} + +func (c *connection) Disconnect(err error) { + c.engine.Disconnect(err) + if c.incoming != nil { + close(c.incoming.In) + } +} + +// FIXME aconway 2015-09-24: needed? +func (c *connection) closed(err error) { + // Call from another goroutine to initiate close without deadlock. + go c.Close(err) +} + +func (c *connection) NewSession() (Session, error) { + var s Session + err := c.engine.InjectWait(func() error { + eSession, err := c.engine.Connection().Session() + if err == nil { + s = newSession(c, eSession) + } + return err + }) + return s, err +} + +func (c *connection) handleIncoming(sn *session, l proton.Link) { + if l.IsReceiver() { + c.incoming.In <- newReceiver(makeIncomingLink(sn, l)) + } else { + c.incoming.In <- newSender(makeIncomingLink(sn, l)) + } +} + +func (c *connection) Accept() (Endpoint, error) { + v, ok := <-c.incoming.Out + if !ok { + return nil, c.Error() + } else { + return v.(Endpoint), nil + } +} + +func (c *connection) Container() Container { return c.container } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go new file mode 100644 index 0000000..5c090e3 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go @@ -0,0 +1,71 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package concurrent + +import ( + "net" + "qpid.apache.org/proton/internal" +) + +// Container is an AMQP container, it represents a single AMQP "application".It +// provides functions to create new Connections to remote containers. +// +// Create with NewContainer() +// +type Container interface { + // Id is a unique identifier for the container in your distributed application. + Id() string + + // Create a new AMQP Connection over the supplied net.Conn connection. + // + // You must call Connection.Open() on the returned Connection, after + // setting any Connection properties you need to set. Note the net.Conn + // can be an outgoing connection (e.g. made with net.Dial) or an incoming + // connection (e.g. made with net.Listener.Accept()) + NewConnection(conn net.Conn) (Connection, error) +} + +type container struct { + id string + linkNames internal.IdCounter +} + +// NewContainer creates a new container. The id must be unique in your +// distributed application, all connections created by the container +// will have this container-id. +// +// If id == "" a random UUID will be generated for the id. +func NewContainer(id string) Container { + if id == "" { + id = internal.UUID4().String() + } + cont := &container{id: id} + return cont +} + +func (cont *container) Id() string { return cont.id } + +func (cont *container) nextLinkName() string { + return cont.id + "@" + cont.linkNames.Next() +} + +func (cont *container) NewConnection(conn net.Conn) (Connection, error) { + return newConnection(conn, cont) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go new file mode 100644 index 0000000..3e7756c --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +/* + +Package concurrent provides a procedural, concurrent Go API for exchanging AMQP +messages. + +AMPQ defines a credit-based scheme for flow control of messages over a +link. Credit is the number of messages the receiver is willing to accept. The +receiver gives credit to the sender. The sender can send messages without +waiting for a response from the receiver until it runs out of credit, at which +point it must wait for more credit to send more messages. + +See the documentation of Sender and Receiver for details of how this API uses credit. +*/ +package concurrent + +//#cgo LDFLAGS: -lqpid-proton +import "C" + +// Just for package comment http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go new file mode 100644 index 0000000..717cac1 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go @@ -0,0 +1,86 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package concurrent + +import ( + "io" + "qpid.apache.org/proton" + "qpid.apache.org/proton/internal" +) + +// Closed is an alias for io.EOF. It is returned as an error when an endpoint +// was closed cleanly. +var Closed = io.EOF + +// Endpoint is the common interface for Connection, Session, Link, Sender and Receiver. +// +// Endpoints can be created locally or by the remote peer. You must Open() an +// endpoint before you can use it. Some endpoints have additional Set*() methods +// that must be called before Open() to take effect, see Connection, Session, +// Link, Sender and Receiver for details. +// +type Endpoint interface { + // Open the endpoint. + Open() error + + // Close an endpoint and signal an error to the remote end if error != nil. + Close(error) + + // String is a human readable identifier, useful for debugging and logging. + String() string + + // Error returns nil if the endpoint is open, otherwise returns an error. + // Error() == Closed means the endpoint was closed without error. + Error() error +} + +// Implements setError() and Error() from Endpoint values that hold an error. +type errorHolder struct { + err internal.FirstError +} + +func (e *errorHolder) setError(err error) error { return e.err.Set(err) } +func (e *errorHolder) Error() error { return e.err.Get() } + +// Implements Error() and String() from Endpoint +type endpoint struct { + errorHolder + str string // Must be set by the value that embeds endpoint. +} + +func (e *endpoint) String() string { return e.str } + +// Call in proton goroutine to close an endpoint locally +// handler will complete the close when remote end closes. +func localClose(ep proton.Endpoint, err error) { + if ep.State().LocalActive() { + if err != nil { + ep.Condition().SetError(err) + } + ep.Close() + } +} + +func (e *endpoint) closeError(err error) { + if err == nil { + err = Closed + } + e.err.Set(err) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/handler.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/handler.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/handler.go new file mode 100644 index 0000000..bf8fcd3 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/handler.go @@ -0,0 +1,137 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package concurrent + +import ( + "qpid.apache.org/proton" + "qpid.apache.org/proton/amqp" +) + +// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated. + +type handler struct { + delegator *proton.MessagingDelegator + connection *connection + links map[proton.Link]Link + sentMessages map[proton.Delivery]*sentMessage + sessions map[proton.Session]*session +} + +func newHandler(c *connection) *handler { + h := &handler{ + connection: c, + links: make(map[proton.Link]Link), + sentMessages: make(map[proton.Delivery]*sentMessage), + sessions: make(map[proton.Session]*session), + } + h.delegator = proton.NewMessagingDelegator(h) + // Disable auto features of MessagingDelegator, we do these ourselves. + h.delegator.Prefetch = 0 + h.delegator.AutoAccept = false + h.delegator.AutoSettle = false + h.delegator.AutoOpen = false + return h +} + +func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) { + switch t { + + case proton.MMessage: + if r, ok := h.links[e.Link()].(*receiver); ok { + r.handleDelivery(e.Delivery()) + } else { + h.connection.closed( + amqp.Errorf(amqp.InternalError, "cannot find receiver for link %s", e.Link())) + } + + case proton.MSettled: + if sm := h.sentMessages[e.Delivery()]; sm != nil { + sm.settled(nil) + } + + case proton.MSessionOpening: + if e.Session().State().LocalUninit() { // Remotely opened + s := newSession(h.connection, e.Session()) + h.sessions[e.Session()] = s + if h.connection.incoming != nil { + h.connection.incoming.In <- s + } else { + proton.CloseError(e.Session(), amqp.Errorf(amqp.NotAllowed, "remote sessions not allowed")) + } + } + + case proton.MSessionClosing: + e.Session().Close() + + case proton.MSessionClosed: + err := e.Session().RemoteCondition().Error() + for l, _ := range h.links { + if l.Session() == e.Session() { + h.linkClosed(l, err) + } + } + delete(h.sessions, e.Session()) + + case proton.MLinkOpening: + l := e.Link() + if l.State().LocalUninit() { // Remotely opened + if h.connection.incoming == nil { + proton.CloseError(l, amqp.Errorf(amqp.NotAllowed, ("no remote links"))) + break + } + s := h.sessions[l.Session()] + if s == nil { + proton.CloseError( + l, amqp.Errorf(amqp.InternalError, ("cannot find session for link"))) + break + } + h.connection.handleIncoming(s, l) + } + + case proton.MLinkClosing: + e.Link().Close() + + case proton.MLinkClosed: + h.linkClosed(e.Link(), e.Link().RemoteCondition().Error()) + + case proton.MDisconnected: + err := h.connection.Error() + for l, _ := range h.links { + h.linkClosed(l, err) + } + for _, s := range h.sessions { + s.closed(err) + } + for _, sm := range h.sentMessages { + sm.settled(err) + } + } +} + +func (h *handler) linkClosed(l proton.Link, err error) { + if link := h.links[l]; link != nil { + link.closed(err) + delete(h.links, l) + } +} + +func (h *handler) addLink(rl proton.Link, ll Link) { + h.links[rl] = ll +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
