http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go deleted file mode 100644 index b3e27bc..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go +++ /dev/null @@ -1,385 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// Test that conversion of Go type to/from AMQP is compatible with other -// bindings. -// -package amqp - -import ( - "bytes" - "fmt" - "io" - "io/ioutil" - "os" - "reflect" - "strings" - "testing" -) - -func checkEqual(want interface{}, got interface{}) error { - if !reflect.DeepEqual(want, got) { - return fmt.Errorf("%#v != %#v", want, got) - } - return nil -} - -func getReader(t *testing.T, name string) (r io.Reader) { - dir := os.Getenv("PN_INTEROP_DIR") - if dir == "" { - t.Skip("no PN_INTEROP_DIR in environment") - } - r, err := os.Open(dir + "/" + name + ".amqp") - if err != nil { - t.Fatalf("can't open %#v: %v", name, err) - } - return -} - -func remaining(d *Decoder) string { - remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader)) - return string(remainder) -} - -// checkDecode: want is the expected value, gotPtr is a pointer to a -// instance of the same type for Decode. -func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) { - - if err := d.Decode(gotPtr); err != nil { - t.Error("Decode failed", err) - return - } - got := reflect.ValueOf(gotPtr).Elem().Interface() - if err := checkEqual(want, got); err != nil { - t.Error("Decode bad value:", err) - return - } - - // Try round trip encoding - bytes, err := Marshal(want, nil) - if err != nil { - t.Error("Marshal failed", err) - return - } - n, err := Unmarshal(bytes, gotPtr) - if err != nil { - t.Error("Unmarshal failed", err) - return - } - if err := checkEqual(n, len(bytes)); err != nil { - t.Error("Bad unmarshal length", err) - return - } - got = reflect.ValueOf(gotPtr).Elem().Interface() - if err = checkEqual(want, got); err != nil { - t.Error("Bad unmarshal value", err) - return - } -} - -func TestUnmarshal(t *testing.T) { - bytes, err := ioutil.ReadAll(getReader(t, "strings")) - if err != nil { - t.Error(err) - } - for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { - var got string - n, err := Unmarshal(bytes, &got) - if err != nil { - t.Error(err) - } - if want != got { - t.Errorf("%#v != %#v", want, got) - } - bytes = bytes[n:] - } -} - -func TestPrimitivesExact(t *testing.T) { - d := NewDecoder(getReader(t, "primitives")) - // Decoding into exact types - var b bool - checkDecode(d, true, &b, t) - checkDecode(d, false, &b, t) - var u8 uint8 - checkDecode(d, uint8(42), &u8, t) - var u16 uint16 - checkDecode(d, uint16(42), &u16, t) - var i16 int16 - checkDecode(d, int16(-42), &i16, t) - var u32 uint32 - checkDecode(d, uint32(12345), &u32, t) - var i32 int32 - checkDecode(d, int32(-12345), &i32, t) - var u64 uint64 - checkDecode(d, uint64(12345), &u64, t) - var i64 int64 - checkDecode(d, int64(-12345), &i64, t) - var f32 float32 - checkDecode(d, float32(0.125), &f32, t) - var f64 float64 - checkDecode(d, float64(0.125), &f64, t) -} - -func TestPrimitivesCompatible(t *testing.T) { - d := NewDecoder(getReader(t, "primitives")) - // Decoding into compatible types - var b bool - var i int - var u uint - var f float64 - checkDecode(d, true, &b, t) - checkDecode(d, false, &b, t) - checkDecode(d, uint(42), &u, t) - checkDecode(d, uint(42), &u, t) - checkDecode(d, -42, &i, t) - checkDecode(d, uint(12345), &u, t) - checkDecode(d, -12345, &i, t) - checkDecode(d, uint(12345), &u, t) - checkDecode(d, -12345, &i, t) - checkDecode(d, 0.125, &f, t) - checkDecode(d, 0.125, &f, t) -} - -// checkDecodeValue: want is the expected value, decode into a reflect.Value -func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) { - - var got, got2 interface{} - if err := d.Decode(&got); err != nil { - t.Error("Decode failed", err) - return - } - if err := checkEqual(want, got); err != nil { - t.Error(err) - return - } - // Try round trip encoding - bytes, err := Marshal(got, nil) - if err != nil { - t.Error(err) - return - } - n, err := Unmarshal(bytes, &got2) - if err != nil { - t.Error(err) - return - } - if err := checkEqual(n, len(bytes)); err != nil { - t.Error(err) - return - } - if err := checkEqual(want, got2); err != nil { - t.Error(err) - return - } -} - -func TestPrimitivesInterface(t *testing.T) { - d := NewDecoder(getReader(t, "primitives")) - checkDecodeInterface(d, true, t) - checkDecodeInterface(d, false, t) - checkDecodeInterface(d, uint8(42), t) - checkDecodeInterface(d, uint16(42), t) - checkDecodeInterface(d, int16(-42), t) - checkDecodeInterface(d, uint32(12345), t) - checkDecodeInterface(d, int32(-12345), t) - checkDecodeInterface(d, uint64(12345), t) - checkDecodeInterface(d, int64(-12345), t) - checkDecodeInterface(d, float32(0.125), t) - checkDecodeInterface(d, float64(0.125), t) -} - -func TestStrings(t *testing.T) { - d := NewDecoder(getReader(t, "strings")) - // Test decoding as plain Go strings - for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { - var got string - checkDecode(d, want, &got, t) - } - remains := remaining(d) - if remains != "" { - t.Errorf("leftover: %s", remains) - } - - // Test decoding as specific string types - d = NewDecoder(getReader(t, "strings")) - var bytes []byte - var str, sym string - checkDecode(d, []byte("abc\000defg"), &bytes, t) - checkDecode(d, "abcdefg", &str, t) - checkDecode(d, "abcdefg", &sym, t) - checkDecode(d, make([]byte, 0), &bytes, t) - checkDecode(d, "", &str, t) - checkDecode(d, "", &sym, t) - remains = remaining(d) - if remains != "" { - t.Fatalf("leftover: %s", remains) - } - - // Test some error handling - d = NewDecoder(getReader(t, "strings")) - var s string - err := d.Decode(s) - if err == nil { - t.Fatal("Expected error") - } - if !strings.Contains(err.Error(), "not a pointer") { - t.Error(err) - } - var i int - err = d.Decode(&i) - if !strings.Contains(err.Error(), "cannot unmarshal") { - t.Error(err) - } - _, err = Unmarshal([]byte{}, nil) - if !strings.Contains(err.Error(), "not enough data") { - t.Error(err) - } - _, err = Unmarshal([]byte("foobar"), nil) - if !strings.Contains(err.Error(), "invalid-argument") { - t.Error(err) - } -} - -func TestEncodeDecode(t *testing.T) { - type data struct { - s string - i int - u8 uint8 - b bool - f float32 - v interface{} - } - - in := data{"foo", 42, 9, true, 1.234, "thing"} - - buf := bytes.Buffer{} - e := NewEncoder(&buf) - if err := e.Encode(in.s); err != nil { - t.Error(err) - } - if err := e.Encode(in.i); err != nil { - t.Error(err) - } - if err := e.Encode(in.u8); err != nil { - t.Error(err) - } - if err := e.Encode(in.b); err != nil { - t.Error(err) - } - if err := e.Encode(in.f); err != nil { - t.Error(err) - } - if err := e.Encode(in.v); err != nil { - t.Error(err) - } - - var out data - d := NewDecoder(&buf) - if err := d.Decode(&out.s); err != nil { - t.Error(err) - } - if err := d.Decode(&out.i); err != nil { - t.Error(err) - } - if err := d.Decode(&out.u8); err != nil { - t.Error(err) - } - if err := d.Decode(&out.b); err != nil { - t.Error(err) - } - if err := d.Decode(&out.f); err != nil { - t.Error(err) - } - if err := d.Decode(&out.v); err != nil { - t.Error(err) - } - - if err := checkEqual(in, out); err != nil { - t.Error(err) - } -} - -func TestMap(t *testing.T) { - d := NewDecoder(getReader(t, "maps")) - - // Generic map - var m Map - checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t) - - // Interface as map - var i interface{} - checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t) - - d = NewDecoder(getReader(t, "maps")) - // Specific typed map - var m2 map[string]int - checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t) - - // Nested map - m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}} - bytes, err := Marshal(m, nil) - if err != nil { - t.Fatal(err) - } - _, err = Unmarshal(bytes, &i) - if err != nil { - t.Fatal(err) - } - if err = checkEqual(m, i); err != nil { - t.Fatal(err) - } -} - -func TestList(t *testing.T) { - d := NewDecoder(getReader(t, "lists")) - var l List - checkDecode(d, List{int32(32), "foo", true}, &l, t) - checkDecode(d, List{}, &l, t) -} - -// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as -// as an AMQP string *inside* an AMQP binary?? Skip the test for now. -func TODO_TestMessage(t *testing.T) { - bytes, err := ioutil.ReadAll(getReader(t, "message")) - if err != nil { - t.Fatal(err) - } - - m, err := DecodeMessage(bytes) - if err != nil { - t.Fatal(err) - } else { - if err := checkEqual(m.Body(), "hello"); err != nil { - t.Error(err) - } - } - - m2 := NewMessageWith("hello") - bytes2, err := m2.Encode(nil) - if err != nil { - t.Error(err) - } else { - if err = checkEqual(bytes, bytes2); err != nil { - t.Error(err) - } - } -} - -// TODO aconway 2015-03-13: finish the full interop test
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go deleted file mode 100644 index bce7323..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go +++ /dev/null @@ -1,250 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include <proton/codec.h> -import "C" - -import ( - "fmt" - "io" - "reflect" - "unsafe" -) - -func dataError(prefix string, data *C.pn_data_t) error { - err := PnError(C.pn_data_error(data)) - if err != nil { - err = fmt.Errorf("%s: %s", prefix, err.Error()) - } - return err -} - -/* -Marshal encodes a Go value as AMQP data in buffer. -If buffer is nil, or is not large enough, a new buffer is created. - -Returns the buffer used for encoding with len() adjusted to the actual size of data. - -Go types are encoded as follows - - +-------------------------------------+--------------------------------------------+ - |Go type |AMQP type | - +-------------------------------------+--------------------------------------------+ - |bool |bool | - +-------------------------------------+--------------------------------------------+ - |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) | - +-------------------------------------+--------------------------------------------+ - |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) | - +-------------------------------------+--------------------------------------------+ - |float32, float64 |float, double. | - +-------------------------------------+--------------------------------------------+ - |string |string | - +-------------------------------------+--------------------------------------------+ - |[]byte, Binary |binary | - +-------------------------------------+--------------------------------------------+ - |Symbol |symbol | - +-------------------------------------+--------------------------------------------+ - |interface{} |the contained type | - +-------------------------------------+--------------------------------------------+ - |nil |null | - +-------------------------------------+--------------------------------------------+ - |map[K]T |map with K and T converted as above | - +-------------------------------------+--------------------------------------------+ - |Map |map, may have mixed types for keys, values | - +-------------------------------------+--------------------------------------------+ - |[]T |list with T converted as above | - +-------------------------------------+--------------------------------------------+ - |List |list, may have mixed types values | - +-------------------------------------+--------------------------------------------+ - -The following Go types cannot be marshaled: uintptr, function, interface, channel - -TODO - -Go types: array, slice, struct, complex64/128. - -AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies. - -Described types. - -*/ -func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { - defer doRecover(&err) - data := C.pn_data(0) - defer C.pn_data_free(data) - marshal(v, data) - encode := func(buf []byte) ([]byte, error) { - n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf))) - switch { - case n == int(C.PN_OVERFLOW): - return buf, overflow - case n < 0: - return buf, dataError("marshal error", data) - default: - return buf[:n], nil - } - } - return encodeGrow(buffer, encode) -} - -const minEncode = 256 - -// overflow is returned when an encoding function can't fit data in the buffer. -var overflow = fmt.Errorf("buffer too small") - -// encodeFn encodes into buffer[0:len(buffer)]. -// Returns buffer with length adjusted for data encoded. -// If buffer too small, returns overflow as error. -type encodeFn func(buffer []byte) ([]byte, error) - -// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer. -// Returns the final buffer. -func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) { - if buffer == nil || len(buffer) == 0 { - buffer = make([]byte, minEncode) - } - var err error - for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) { - buffer = make([]byte, 2*len(buffer)) - } - return buffer, err -} - -func marshal(v interface{}, data *C.pn_data_t) { - switch v := v.(type) { - case nil: - C.pn_data_put_null(data) - case bool: - C.pn_data_put_bool(data, C.bool(v)) - case int8: - C.pn_data_put_byte(data, C.int8_t(v)) - case int16: - C.pn_data_put_short(data, C.int16_t(v)) - case int32: - C.pn_data_put_int(data, C.int32_t(v)) - case int64: - C.pn_data_put_long(data, C.int64_t(v)) - case int: - if unsafe.Sizeof(0) == 8 { - C.pn_data_put_long(data, C.int64_t(v)) - } else { - C.pn_data_put_int(data, C.int32_t(v)) - } - case uint8: - C.pn_data_put_ubyte(data, C.uint8_t(v)) - case uint16: - C.pn_data_put_ushort(data, C.uint16_t(v)) - case uint32: - C.pn_data_put_uint(data, C.uint32_t(v)) - case uint64: - C.pn_data_put_ulong(data, C.uint64_t(v)) - case uint: - if unsafe.Sizeof(0) == 8 { - C.pn_data_put_ulong(data, C.uint64_t(v)) - } else { - C.pn_data_put_uint(data, C.uint32_t(v)) - } - case float32: - C.pn_data_put_float(data, C.float(v)) - case float64: - C.pn_data_put_double(data, C.double(v)) - case string: - C.pn_data_put_string(data, pnBytes([]byte(v))) - case []byte: - C.pn_data_put_binary(data, pnBytes(v)) - case Binary: - C.pn_data_put_binary(data, pnBytes([]byte(v))) - case Symbol: - C.pn_data_put_symbol(data, pnBytes([]byte(v))) - case Map: // Special map type - C.pn_data_put_map(data) - C.pn_data_enter(data) - for key, val := range v { - marshal(key, data) - marshal(val, data) - } - C.pn_data_exit(data) - default: - switch reflect.TypeOf(v).Kind() { - case reflect.Map: - putMap(data, v) - case reflect.Slice: - putList(data, v) - default: - panic(fmt.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) - } - } - err := dataError("marshal", data) - if err != nil { - panic(err) - } - return -} - -func clearMarshal(v interface{}, data *C.pn_data_t) { - C.pn_data_clear(data) - marshal(v, data) -} - -func putMap(data *C.pn_data_t, v interface{}) { - mapValue := reflect.ValueOf(v) - C.pn_data_put_map(data) - C.pn_data_enter(data) - for _, key := range mapValue.MapKeys() { - marshal(key.Interface(), data) - marshal(mapValue.MapIndex(key).Interface(), data) - } - C.pn_data_exit(data) -} - -func putList(data *C.pn_data_t, v interface{}) { - listValue := reflect.ValueOf(v) - C.pn_data_put_list(data) - C.pn_data_enter(data) - for i := 0; i < listValue.Len(); i++ { - marshal(listValue.Index(i).Interface(), data) - } - C.pn_data_exit(data) -} - -// Encoder encodes AMQP values to an io.Writer -type Encoder struct { - writer io.Writer - buffer []byte -} - -// New encoder returns a new encoder that writes to w. -func NewEncoder(w io.Writer) *Encoder { - return &Encoder{w, make([]byte, minEncode)} -} - -func (e *Encoder) Encode(v interface{}) (err error) { - e.buffer, err = Marshal(v, e.buffer) - if err == nil { - _, err = e.writer.Write(e.buffer) - } - return err -} - -func replace(data *C.pn_data_t, v interface{}) { - C.pn_data_clear(data) - marshal(v, data) -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go deleted file mode 100644 index 753682e..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go +++ /dev/null @@ -1,348 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include <proton/codec.h> -// #include <proton/types.h> -// #include <proton/message.h> -// #include <stdlib.h> -// -// /* Helper for setting message string fields */ -// typedef int (*set_fn)(pn_message_t*, const char*); -// int msg_set_str(pn_message_t* m, char* s, set_fn set) { -// int result = set(m, s); -// free(s); -// return result; -// } -// -import "C" - -import ( - "fmt" - "runtime" - "time" - "unsafe" -) - -// Message is the interface to an AMQP message. -type Message interface { - // Durable indicates that any parties taking responsibility - // for the message must durably store the content. - Durable() bool - SetDurable(bool) - - // Priority impacts ordering guarantees. Within a - // given ordered context, higher priority messages may jump ahead of - // lower priority messages. - Priority() uint8 - SetPriority(uint8) - - // TTL or Time To Live, a message it may be dropped after this duration - TTL() time.Duration - SetTTL(time.Duration) - - // FirstAcquirer indicates - // that the recipient of the message is the first recipient to acquire - // the message, i.e. there have been no failed delivery attempts to - // other acquirers. Note that this does not mean the message has not - // been delivered to, but not acquired, by other recipients. - FirstAcquirer() bool - SetFirstAcquirer(bool) - - // DeliveryCount tracks how many attempts have been made to - // delivery a message. - DeliveryCount() uint32 - SetDeliveryCount(uint32) - - // MessageId provides a unique identifier for a message. - // it can be an a string, an unsigned long, a uuid or a - // binary value. - MessageId() interface{} - SetMessageId(interface{}) - - UserId() string - SetUserId(string) - - Address() string - SetAddress(string) - - Subject() string - SetSubject(string) - - ReplyTo() string - SetReplyTo(string) - - // CorrelationId is set on correlated request and response messages. It can be - // an a string, an unsigned long, a uuid or a binary value. - CorrelationId() interface{} - SetCorrelationId(interface{}) - - ContentType() string - SetContentType(string) - - ContentEncoding() string - SetContentEncoding(string) - - // ExpiryTime indicates an absoulte time when the message may be dropped. - // A Zero time (i.e. t.isZero() == true) indicates a message never expires. - ExpiryTime() time.Time - SetExpiryTime(time.Time) - - CreationTime() time.Time - SetCreationTime(time.Time) - - GroupId() string - SetGroupId(string) - - GroupSequence() int32 - SetGroupSequence(int32) - - ReplyToGroupId() string - SetReplyToGroupId(string) - - // Instructions - AMQP delivery instructions. - Instructions() map[string]interface{} - SetInstructions(v map[string]interface{}) - - // Annotations - AMQP annotations. - Annotations() map[string]interface{} - SetAnnotations(v map[string]interface{}) - - // Properties - Application properties. - Properties() map[string]interface{} - SetProperties(v map[string]interface{}) - - // Inferred indicates how the message content - // is encoded into AMQP sections. If inferred is true then binary and - // list values in the body of the message will be encoded as AMQP DATA - // and AMQP SEQUENCE sections, respectively. If inferred is false, - // then all values in the body of the message will be encoded as AMQP - // VALUE sections regardless of their type. - Inferred() bool - SetInferred(bool) - - // Marshal a Go value into the message body. See amqp.Marshal() for details. - Marshal(interface{}) - - // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details. - Unmarshal(interface{}) - - // Body value resulting from the default unmarshalling of message body as interface{} - Body() interface{} - - // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough - // the message is encoded into it, otherwise a new buffer is created. - // Returns the buffer containing the message. - Encode(buffer []byte) ([]byte, error) - - // Decode data into this message. Overwrites an existing message content. - Decode(buffer []byte) error - - // Clear the message contents. - Clear() - - // Copy the contents of another message to this one. - Copy(m Message) error -} - -type message struct{ pn *C.pn_message_t } - -func freeMessage(m *message) { - C.pn_message_free(m.pn) - m.pn = nil -} - -// NewMessage creates a new message instance. -func NewMessage() Message { - m := &message{C.pn_message()} - runtime.SetFinalizer(m, freeMessage) - return m -} - -// NewMessageWith creates a message with value as the body. Equivalent to -// m := NewMessage(); m.Marshal(body) -func NewMessageWith(value interface{}) Message { - m := NewMessage() - m.Marshal(value) - return m -} - -func (m *message) Clear() { C.pn_message_clear(m.pn) } - -func (m *message) Copy(x Message) error { - if data, err := x.Encode(nil); err == nil { - return m.Decode(data) - } else { - return err - } -} - -// ==== message get functions - -func rewindGet(data *C.pn_data_t) (v interface{}) { - C.pn_data_rewind(data) - C.pn_data_next(data) - unmarshal(&v, data) - return v -} - -func rewindMap(data *C.pn_data_t) (v map[string]interface{}) { - C.pn_data_rewind(data) - C.pn_data_next(data) - unmarshal(&v, data) - return v -} - -func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) } -func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) } -func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) } -func (m *message) TTL() time.Duration { - return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond -} -func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) } -func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) } -func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) } -func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) } -func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) } -func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) } -func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) } -func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) } -func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) } -func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) } - -func (m *message) ExpiryTime() time.Time { - return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn)))) -} -func (m *message) CreationTime() time.Time { - return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn))) -} -func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) } -func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) } -func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) } - -func (m *message) Instructions() map[string]interface{} { - return rewindMap(C.pn_message_instructions(m.pn)) -} -func (m *message) Annotations() map[string]interface{} { - return rewindMap(C.pn_message_annotations(m.pn)) -} -func (m *message) Properties() map[string]interface{} { - return rewindMap(C.pn_message_properties(m.pn)) -} - -// ==== message set methods - -func setData(v interface{}, data *C.pn_data_t) { - C.pn_data_clear(data) - marshal(v, data) -} - -func dataString(data *C.pn_data_t) string { - str := C.pn_string(C.CString("")) - defer C.pn_free(unsafe.Pointer(str)) - C.pn_inspect(unsafe.Pointer(data), str) - return C.GoString(C.pn_string_get(str)) -} - -func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(b)) } -func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) } -func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) } -func (m *message) SetTTL(d time.Duration) { - C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond)) -} -func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) } -func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) } -func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) } -func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) } -func (m *message) SetAddress(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address)) -} -func (m *message) SetSubject(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject)) -} -func (m *message) SetReplyTo(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to)) -} -func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) } -func (m *message) SetContentType(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type)) -} -func (m *message) SetContentEncoding(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding)) -} -func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) } -func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) } -func (m *message) SetGroupId(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id)) -} -func (m *message) SetGroupSequence(s int32) { - C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s)) -} -func (m *message) SetReplyToGroupId(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id)) -} - -func (m *message) SetInstructions(v map[string]interface{}) { - setData(v, C.pn_message_instructions(m.pn)) -} -func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) } -func (m *message) SetProperties(v map[string]interface{}) { setData(v, C.pn_message_properties(m.pn)) } - -// Marshal/Unmarshal body -func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) } -func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) } -func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return } - -func (m *message) Decode(data []byte) error { - m.Clear() - if len(data) == 0 { - return fmt.Errorf("empty buffer for decode") - } - if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 { - return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn))) - } - return nil -} - -func DecodeMessage(data []byte) (m Message, err error) { - m = NewMessage() - err = m.Decode(data) - return -} - -func (m *message) Encode(buffer []byte) ([]byte, error) { - encode := func(buf []byte) ([]byte, error) { - len := cLen(buf) - result := C.pn_message_encode(m.pn, cPtr(buf), &len) - switch { - case result == C.PN_OVERFLOW: - return buf, overflow - case result < 0: - return buf, fmt.Errorf("cannot encode message: %s", PnErrorCode(result)) - default: - return buf[:len], nil - } - } - return encodeGrow(buffer, encode) -} - -// TODO aconway 2015-09-14: Multi-section messages. - -// TODO aconway 2016-09-09: Message.String() use inspect. http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go deleted file mode 100644 index 7a6e5a8..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go +++ /dev/null @@ -1,166 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -import ( - "testing" - "time" -) - -func roundTrip(m Message) error { - buffer, err := m.Encode(nil) - if err != nil { - return err - } - m2, err := DecodeMessage(buffer) - if err != nil { - return err - } - return checkEqual(m, m2) -} - -func TestDefaultMessage(t *testing.T) { - m := NewMessage() - // Check defaults - for _, data := range [][]interface{}{ - {m.Inferred(), false}, - {m.Durable(), false}, - {m.Priority(), uint8(4)}, - {m.TTL(), time.Duration(0)}, - {m.UserId(), ""}, - {m.Address(), ""}, - {m.Subject(), ""}, - {m.ReplyTo(), ""}, - {m.ContentType(), ""}, - {m.ContentEncoding(), ""}, - {m.GroupId(), ""}, - {m.GroupSequence(), int32(0)}, - {m.ReplyToGroupId(), ""}, - {m.MessageId(), nil}, - {m.CorrelationId(), nil}, - {m.Instructions(), map[string]interface{}{}}, - {m.Annotations(), map[string]interface{}{}}, - {m.Properties(), map[string]interface{}{}}, - {m.Body(), nil}, - } { - if err := checkEqual(data[0], data[1]); err != nil { - t.Error(err) - } - } - if err := roundTrip(m); err != nil { - t.Error(err) - } -} - -func TestMessageRoundTrip(t *testing.T) { - m := NewMessage() - m.SetInferred(false) - m.SetDurable(true) - m.SetPriority(42) - m.SetTTL(0) - m.SetUserId("user") - m.SetAddress("address") - m.SetSubject("subject") - m.SetReplyTo("replyto") - m.SetContentType("content") - m.SetContentEncoding("encoding") - m.SetGroupId("group") - m.SetGroupSequence(42) - m.SetReplyToGroupId("replytogroup") - m.SetMessageId("id") - m.SetCorrelationId("correlation") - m.SetInstructions(map[string]interface{}{"instructions": "foo"}) - m.SetAnnotations(map[string]interface{}{"annotations": "foo"}) - m.SetProperties(map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}) - m.Marshal("hello") - - for _, data := range [][]interface{}{ - {m.Inferred(), false}, - {m.Durable(), true}, - {m.Priority(), uint8(42)}, - {m.TTL(), time.Duration(0)}, - {m.UserId(), "user"}, - {m.Address(), "address"}, - {m.Subject(), "subject"}, - {m.ReplyTo(), "replyto"}, - {m.ContentType(), "content"}, - {m.ContentEncoding(), "encoding"}, - {m.GroupId(), "group"}, - {m.GroupSequence(), int32(42)}, - {m.ReplyToGroupId(), "replytogroup"}, - {m.MessageId(), "id"}, - {m.CorrelationId(), "correlation"}, - {m.Instructions(), map[string]interface{}{"instructions": "foo"}}, - {m.Annotations(), map[string]interface{}{"annotations": "foo"}}, - {m.Properties(), map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}}, - {m.Body(), "hello"}, - } { - if err := checkEqual(data[0], data[1]); err != nil { - t.Error(err) - } - } - if err := roundTrip(m); err != nil { - t.Error(err) - } -} - -func TestMessageBodyTypes(t *testing.T) { - var s string - var body interface{} - var i int64 - - m := NewMessageWith(int64(42)) - m.Unmarshal(&body) - m.Unmarshal(&i) - if err := checkEqual(body.(int64), int64(42)); err != nil { - t.Error(err) - } - if err := checkEqual(i, int64(42)); err != nil { - t.Error(err) - } - - m = NewMessageWith("hello") - m.Unmarshal(&s) - m.Unmarshal(&body) - if err := checkEqual(s, "hello"); err != nil { - t.Error(err) - } - if err := checkEqual(body.(string), "hello"); err != nil { - t.Error(err) - } - if err := roundTrip(m); err != nil { - t.Error(err) - } - - m = NewMessageWith(Binary("bin")) - m.Unmarshal(&s) - m.Unmarshal(&body) - if err := checkEqual(body.(Binary), Binary("bin")); err != nil { - t.Error(err) - } - if err := checkEqual(s, "bin"); err != nil { - t.Error(err) - } - if err := roundTrip(m); err != nil { - t.Error(err) - } - - // TODO aconway 2015-09-08: array etc. -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go deleted file mode 100644 index 2852c23..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go +++ /dev/null @@ -1,194 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include <proton/codec.h> -import "C" - -import ( - "bytes" - "fmt" - "reflect" - "time" - "unsafe" -) - -func (t C.pn_type_t) String() string { - switch C.pn_type_t(t) { - case C.PN_NULL: - return "null" - case C.PN_BOOL: - return "bool" - case C.PN_UBYTE: - return "ubyte" - case C.PN_BYTE: - return "byte" - case C.PN_USHORT: - return "ushort" - case C.PN_SHORT: - return "short" - case C.PN_CHAR: - return "char" - case C.PN_UINT: - return "uint" - case C.PN_INT: - return "int" - case C.PN_ULONG: - return "ulong" - case C.PN_LONG: - return "long" - case C.PN_TIMESTAMP: - return "timestamp" - case C.PN_FLOAT: - return "float" - case C.PN_DOUBLE: - return "double" - case C.PN_DECIMAL32: - return "decimal32" - case C.PN_DECIMAL64: - return "decimal64" - case C.PN_DECIMAL128: - return "decimal128" - case C.PN_UUID: - return "uuid" - case C.PN_BINARY: - return "binary" - case C.PN_STRING: - return "string" - case C.PN_SYMBOL: - return "symbol" - case C.PN_DESCRIBED: - return "described" - case C.PN_ARRAY: - return "array" - case C.PN_LIST: - return "list" - case C.PN_MAP: - return "map" - default: - return "no-data" - } -} - -// Go types -var ( - bytesType = reflect.TypeOf([]byte{}) - valueType = reflect.TypeOf(reflect.Value{}) -) - -// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys. - -// Map is a generic map that can have mixed key and value types and so can represent any AMQP map -type Map map[interface{}]interface{} - -// List is a generic list that can hold mixed values and can represent any AMQP list. -// -type List []interface{} - -// Symbol is a string that is encoded as an AMQP symbol -type Symbol string - -func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) } - -// Binary is a string that is encoded as an AMQP binary. -// It is a string rather than a byte[] because byte[] is not hashable and can't be used as -// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte -type Binary string - -func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) } - -// GoString for Map prints values with their types, useful for debugging. -func (m Map) GoString() string { - out := &bytes.Buffer{} - fmt.Fprintf(out, "%T{", m) - i := len(m) - for k, v := range m { - fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v) - i-- - if i > 0 { - fmt.Fprint(out, ", ") - } - } - fmt.Fprint(out, "}") - return out.String() -} - -// GoString for List prints values with their types, useful for debugging. -func (l List) GoString() string { - out := &bytes.Buffer{} - fmt.Fprintf(out, "%T{", l) - for i := 0; i < len(l); i++ { - fmt.Fprintf(out, "%T(%#v)", l[i], l[i]) - if i == len(l)-1 { - fmt.Fprint(out, ", ") - } - } - fmt.Fprint(out, "}") - return out.String() -} - -// pnTime converts Go time.Time to Proton millisecond Unix time. -func pnTime(t time.Time) C.pn_timestamp_t { - secs := t.Unix() - // Note: sub-second accuracy is not guaraunteed if the Unix time in - // nanoseconds cannot be represented by an int64 (sometime around year 2260) - msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) - return C.pn_timestamp_t(secs*1000 + msecs) -} - -// goTime converts a pn_timestamp_t to a Go time.Time. -func goTime(t C.pn_timestamp_t) time.Time { - secs := int64(t) / 1000 - nsecs := (int64(t) % 1000) * int64(time.Millisecond) - return time.Unix(secs, nsecs) -} - -func goBytes(cBytes C.pn_bytes_t) (bytes []byte) { - if cBytes.start != nil { - bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size)) - } - return -} - -func goString(cBytes C.pn_bytes_t) (str string) { - if cBytes.start != nil { - str = C.GoStringN(cBytes.start, C.int(cBytes.size)) - } - return -} - -func pnBytes(b []byte) C.pn_bytes_t { - if len(b) == 0 { - return C.pn_bytes_t{0, nil} - } else { - return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))} - } -} - -func cPtr(b []byte) *C.char { - if len(b) == 0 { - return nil - } - return (*C.char)(unsafe.Pointer(&b[0])) -} - -func cLen(b []byte) C.size_t { - return C.size_t(len(b)) -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go deleted file mode 100644 index 8f380a7..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go +++ /dev/null @@ -1,557 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -oor more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include <proton/codec.h> -import "C" - -import ( - "bytes" - "fmt" - "io" - "reflect" - "unsafe" -) - -const minDecode = 1024 - -// Error returned if AMQP data cannot be unmarshaled as the desired Go type. -type UnmarshalError struct { - // The name of the AMQP type. - AMQPType string - // The Go type. - GoType reflect.Type -} - -func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError { - return &UnmarshalError{C.pn_type_t(pnType).String(), reflect.TypeOf(v)} -} - -func (e UnmarshalError) Error() string { - if e.GoType.Kind() != reflect.Ptr { - return fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType) - } else { - return fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) - } -} - -func doRecover(err *error) { - r := recover() - switch r := r.(type) { - case nil: - case *UnmarshalError: - *err = r - default: - panic(r) - } -} - -// -// Decoding from a pn_data_t -// -// NOTE: we use panic() to signal a decoding error, simplifies decoding logic. -// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode. -// - -// Decoder decodes AMQP values from an io.Reader. -// -type Decoder struct { - reader io.Reader - buffer bytes.Buffer -} - -// NewDecoder returns a new decoder that reads from r. -// -// The decoder has it's own buffer and may read more data than required for the -// AMQP values requested. Use Buffered to see if there is data left in the -// buffer. -// -func NewDecoder(r io.Reader) *Decoder { - return &Decoder{r, bytes.Buffer{}} -} - -// Buffered returns a reader of the data remaining in the Decoder's buffer. The -// reader is valid until the next call to Decode. -// -func (d *Decoder) Buffered() io.Reader { - return bytes.NewReader(d.buffer.Bytes()) -} - -// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v. -// -// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value. -// -func (d *Decoder) Decode(v interface{}) (err error) { - defer doRecover(&err) - data := C.pn_data(0) - defer C.pn_data_free(data) - var n int - for n == 0 { - n, err = decode(data, d.buffer.Bytes()) - if err != nil { - return err - } - if n == 0 { // n == 0 means not enough data, read more - err = d.more() - } else { - unmarshal(v, data) - } - } - d.buffer.Next(n) - return -} - -/* -Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v. -Types are converted as follows: - - +---------------------------+----------------------------------------------------------------------+ - |To Go types |From AMQP types | - +===========================+======================================================================+ - |bool |bool | - +---------------------------+----------------------------------------------------------------------+ - |int, int8, int16, |Equivalent or smaller signed integer type: byte, short, int, long. | - |int32, int64 | | - +---------------------------+----------------------------------------------------------------------+ - |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: ubyte, ushort, uint, | - |uint32, uint64 types |ulong | - +---------------------------+----------------------------------------------------------------------+ - |float32, float64 |Equivalent or smaller float or double. | - +---------------------------+----------------------------------------------------------------------+ - |string, []byte |string, symbol or binary. | - +---------------------------+----------------------------------------------------------------------+ - |Symbol |symbol | - +---------------------------+----------------------------------------------------------------------+ - |map[K]T |map, provided all keys and values can unmarshal to types K, T | - +---------------------------+----------------------------------------------------------------------+ - |Map |map, any AMQP map | - +---------------------------+----------------------------------------------------------------------+ - |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: | - | +------------------------+---------------------------------------------+ - | |AMQP Type |Go Type in interface{} | - | +========================+=============================================+ - | |bool |bool | - | +------------------------+---------------------------------------------+ - | |byte,short,int,long |int8,int16,int32,int64 | - | +------------------------+---------------------------------------------+ - | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 | - | +------------------------+---------------------------------------------+ - | |float, double |float32, float64 | - | +------------------------+---------------------------------------------+ - | |string |string | - | +------------------------+---------------------------------------------+ - | |symbol |Symbol | - | +------------------------+---------------------------------------------+ - | |binary |Binary | - | +------------------------+---------------------------------------------+ - | |nulll |nil | - | +------------------------+---------------------------------------------+ - | |map |Map | - | +------------------------+---------------------------------------------+ - | |list |List | - +---------------------------+------------------------+---------------------------------------------+ - -The following Go types cannot be unmarshaled: uintptr, function, interface, channel. - -TODO - -Go types: array, struct. - -AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies. - -AMQP maps with mixed/unhashable key types need an alternate representation. - -Described types. -*/ -func Unmarshal(bytes []byte, v interface{}) (n int, err error) { - defer doRecover(&err) - - data := C.pn_data(0) - defer C.pn_data_free(data) - n, err = decode(data, bytes) - if err != nil { - return 0, err - } - if n == 0 { - return 0, fmt.Errorf("not enough data") - } else { - unmarshal(v, data) - } - return n, nil -} - -// more reads more data when we can't parse a complete AMQP type -func (d *Decoder) more() error { - var readSize int64 = minDecode - if int64(d.buffer.Len()) > readSize { // Grow by doubling - readSize = int64(d.buffer.Len()) - } - var n int64 - n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize)) - if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0 - err = io.EOF - } - return err -} - -// Unmarshal from data into value pointed at by v. -func unmarshal(v interface{}, data *C.pn_data_t) { - pnType := C.pn_data_type(data) - switch v := v.(type) { - case *bool: - switch pnType { - case C.PN_BOOL: - *v = bool(C.pn_data_get_bool(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - case *int8: - switch pnType { - case C.PN_CHAR: - *v = int8(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int8(C.pn_data_get_byte(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - case *uint8: - switch pnType { - case C.PN_CHAR: - *v = uint8(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint8(C.pn_data_get_ubyte(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - case *int16: - switch pnType { - case C.PN_CHAR: - *v = int16(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int16(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int16(C.pn_data_get_short(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - case *uint16: - switch pnType { - case C.PN_CHAR: - *v = uint16(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint16(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint16(C.pn_data_get_ushort(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - case *int32: - switch pnType { - case C.PN_CHAR: - *v = int32(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int32(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int32(C.pn_data_get_short(data)) - case C.PN_INT: - *v = int32(C.pn_data_get_int(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - case *uint32: - switch pnType { - case C.PN_CHAR: - *v = uint32(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint32(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint32(C.pn_data_get_ushort(data)) - case C.PN_UINT: - *v = uint32(C.pn_data_get_uint(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - - case *int64: - switch pnType { - case C.PN_CHAR: - *v = int64(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int64(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int64(C.pn_data_get_short(data)) - case C.PN_INT: - *v = int64(C.pn_data_get_int(data)) - case C.PN_LONG: - *v = int64(C.pn_data_get_long(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - - case *uint64: - switch pnType { - case C.PN_CHAR: - *v = uint64(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint64(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint64(C.pn_data_get_ushort(data)) - case C.PN_ULONG: - *v = uint64(C.pn_data_get_ulong(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - - case *int: - switch pnType { - case C.PN_CHAR: - *v = int(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int(C.pn_data_get_short(data)) - case C.PN_INT: - *v = int(C.pn_data_get_int(data)) - case C.PN_LONG: - if unsafe.Sizeof(0) == 8 { - *v = int(C.pn_data_get_long(data)) - } else { - panic(newUnmarshalError(pnType, v)) - } - default: - panic(newUnmarshalError(pnType, v)) - } - - case *uint: - switch pnType { - case C.PN_CHAR: - *v = uint(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint(C.pn_data_get_ushort(data)) - case C.PN_UINT: - *v = uint(C.pn_data_get_uint(data)) - case C.PN_ULONG: - if unsafe.Sizeof(0) == 8 { - *v = uint(C.pn_data_get_ulong(data)) - } else { - panic(newUnmarshalError(pnType, v)) - } - default: - panic(newUnmarshalError(pnType, v)) - } - - case *float32: - switch pnType { - case C.PN_FLOAT: - *v = float32(C.pn_data_get_float(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - - case *float64: - switch pnType { - case C.PN_FLOAT: - *v = float64(C.pn_data_get_float(data)) - case C.PN_DOUBLE: - *v = float64(C.pn_data_get_double(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - - case *string: - switch pnType { - case C.PN_STRING: - *v = goString(C.pn_data_get_string(data)) - case C.PN_SYMBOL: - *v = goString(C.pn_data_get_symbol(data)) - case C.PN_BINARY: - *v = goString(C.pn_data_get_binary(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - - case *[]byte: - switch pnType { - case C.PN_STRING: - *v = goBytes(C.pn_data_get_string(data)) - case C.PN_SYMBOL: - *v = goBytes(C.pn_data_get_symbol(data)) - case C.PN_BINARY: - *v = goBytes(C.pn_data_get_binary(data)) - default: - panic(newUnmarshalError(pnType, v)) - } - - case *Binary: - switch pnType { - case C.PN_BINARY: - *v = Binary(goBytes(C.pn_data_get_binary(data))) - default: - panic(newUnmarshalError(pnType, v)) - } - - case *Symbol: - switch pnType { - case C.PN_SYMBOL: - *v = Symbol(goBytes(C.pn_data_get_symbol(data))) - default: - panic(newUnmarshalError(pnType, v)) - } - - case *interface{}: - getInterface(data, v) - - default: - if reflect.TypeOf(v).Kind() != reflect.Ptr { - panic(newUnmarshalError(pnType, v)) - } - switch reflect.TypeOf(v).Elem().Kind() { - case reflect.Map: - getMap(data, v) - case reflect.Slice: - getList(data, v) - default: - panic(newUnmarshalError(pnType, v)) - } - } - err := dataError("unmarshaling", data) - if err != nil { - panic(err) - } - return -} - -func rewindUnmarshal(v interface{}, data *C.pn_data_t) { - C.pn_data_rewind(data) - C.pn_data_next(data) - unmarshal(v, data) -} - -// Getting into an interface is driven completely by the AMQP type, since the interface{} -// target is type-neutral. -func getInterface(data *C.pn_data_t, v *interface{}) { - pnType := C.pn_data_type(data) - switch pnType { - case C.PN_BOOL: - *v = bool(C.pn_data_get_bool(data)) - case C.PN_UBYTE: - *v = uint8(C.pn_data_get_ubyte(data)) - case C.PN_BYTE: - *v = int8(C.pn_data_get_byte(data)) - case C.PN_USHORT: - *v = uint16(C.pn_data_get_ushort(data)) - case C.PN_SHORT: - *v = int16(C.pn_data_get_short(data)) - case C.PN_UINT: - *v = uint32(C.pn_data_get_uint(data)) - case C.PN_INT: - *v = int32(C.pn_data_get_int(data)) - case C.PN_CHAR: - *v = uint8(C.pn_data_get_char(data)) - case C.PN_ULONG: - *v = uint64(C.pn_data_get_ulong(data)) - case C.PN_LONG: - *v = int64(C.pn_data_get_long(data)) - case C.PN_FLOAT: - *v = float32(C.pn_data_get_float(data)) - case C.PN_DOUBLE: - *v = float64(C.pn_data_get_double(data)) - case C.PN_BINARY: - *v = Binary(goBytes(C.pn_data_get_binary(data))) - case C.PN_STRING: - *v = goString(C.pn_data_get_string(data)) - case C.PN_SYMBOL: - *v = Symbol(goString(C.pn_data_get_symbol(data))) - case C.PN_MAP: - m := make(Map) - unmarshal(&m, data) - *v = m - case C.PN_LIST: - l := make(List, 0) - unmarshal(&l, data) - *v = l - default: // No data (-1 or NULL) - *v = nil - } -} - -// get into map pointed at by v -func getMap(data *C.pn_data_t, v interface{}) { - mapValue := reflect.ValueOf(v).Elem() - mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map - switch pnType := C.pn_data_type(data); pnType { - case C.PN_MAP: - count := int(C.pn_data_get_map(data)) - if bool(C.pn_data_enter(data)) { - defer C.pn_data_exit(data) - for i := 0; i < count/2; i++ { - if bool(C.pn_data_next(data)) { - key := reflect.New(mapValue.Type().Key()) - unmarshal(key.Interface(), data) - if bool(C.pn_data_next(data)) { - val := reflect.New(mapValue.Type().Elem()) - unmarshal(val.Interface(), data) - mapValue.SetMapIndex(key.Elem(), val.Elem()) - } - } - } - } - default: // Empty/error/unknown, leave map empty - } -} - -func getList(data *C.pn_data_t, v interface{}) { - pnType := C.pn_data_type(data) - if pnType != C.PN_LIST { - panic(newUnmarshalError(pnType, v)) - } - count := int(C.pn_data_get_list(data)) - listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count) - if bool(C.pn_data_enter(data)) { - for i := 0; i < count; i++ { - if bool(C.pn_data_next(data)) { - val := reflect.New(listValue.Type().Elem()) - unmarshal(val.Interface(), data) - listValue.Index(i).Set(val.Elem()) - } - } - C.pn_data_exit(data) - } - reflect.ValueOf(v).Elem().Set(listValue) -} - -// decode from bytes. -// Return bytes decoded or 0 if we could not decode a complete object. -// -func decode(data *C.pn_data_t, bytes []byte) (int, error) { - if len(bytes) == 0 { - return 0, nil - } - n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes))) - if n == int(C.PN_UNDERFLOW) { - C.pn_error_clear(C.pn_data_error(data)) - return 0, nil - } else if n <= 0 { - return 0, fmt.Errorf("unmarshal %s", PnErrorCode(n)) - } - return n, nil -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go deleted file mode 100644 index fd6c8dc..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go +++ /dev/null @@ -1,104 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -import ( - "errors" - "net" - "net/url" - "strings" -) - -const ( - amqp string = "amqp" - amqps = "amqps" - defaulthost = "localhost" -) - -// The way this is used it can only get a hostport already validated by -// the URL parser, so this means we can skip some error checks -func splitHostPort(hostport string) (string, string, error) { - if hostport == "" { - return "", "", nil - } - if hostport[0] == '[' { - // There must be a matching ']' as already validated - if l := strings.LastIndex(hostport, "]"); len(hostport) == l+1 { - // trim off '[' and ']' - return hostport[1:l], "", nil - } - } else if strings.IndexByte(hostport, ':') < 0 { - return hostport, "", nil - } - return net.SplitHostPort(hostport) -} - -func UpdateURL(in *url.URL) (err error) { - // Detect form without "amqp://" and stick it on front - // to make it match the usual proton defaults - u := new (url.URL) - *u = *in - if (u.Scheme != "" && u.Opaque != "") || - (u.Scheme == "" && u.Host == "") { - input := u.String() - input = "amqp://" + input - u, err = url.Parse(input) - if err != nil { - return - } - } - // If Scheme is still "" then default to amqp - if u.Scheme == "" { - u.Scheme = amqp - } - // Error if the scheme is not an amqp scheme - if u.Scheme != amqp && u.Scheme != amqps { - return errors.New("invalid amqp scheme") - } - // Decompose Host into host and port - host, port, err := splitHostPort(u.Host) - if err != nil { - return - } - if host == "" { - host = defaulthost - } - if port == "" { - port = u.Scheme - } - u.Host = net.JoinHostPort(host, port) - *in = *u - return nil -} - -// ParseUrl parses an AMQP URL string and returns a net/url.Url. -// -// It is more forgiving than net/url.Parse and allows most of the parts of the -// URL to be missing, assuming AMQP defaults. -// -func ParseURL(s string) (u *url.URL, err error) { - if u, err = url.Parse(s); err != nil { - return - } - if err = UpdateURL(u); err != nil { - u = nil - } - return u, err -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go deleted file mode 100644 index f52d4bf..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go +++ /dev/null @@ -1,59 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -import ( - "fmt" -) - -func ExampleParseURL() { - for _, s := range []string{ - "amqp://username:password@host:1234/path", - "host:1234", - "host", - "host/path", - "amqps://host", - "/path", - "", - ":1234", - // Taken out becasue the go 1.4 URL parser isn't the same as later - //"[::1]", - //"[::1", - // Output would be: - // amqp://[::1]:amqp - // parse amqp://[::1: missing ']' in host - } { - u, err := ParseURL(s) - if err != nil { - fmt.Println(err) - } else { - fmt.Println(u) - } - } - // Output: - // amqp://username:password@host:1234/path - // amqp://host:1234 - // amqp://host:amqp - // amqp://host:amqp/path - // amqps://host:amqps - // amqp://localhost:amqp/path - // amqp://localhost:amqp - // parse :1234: missing protocol scheme -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/version.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/version.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/version.go deleted file mode 100644 index cefa904..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/amqp/version.go +++ /dev/null @@ -1,29 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// Version check for proton library. -// Done here because this is the lowest-level dependency for all the proton Go packages. - -// #include <proton/version.h> -// #if PN_VERSION_MINOR < 10 -// #error packages qpid.apache.org/... require Proton-C library version 0.10 or greater -// #endif -import "C" http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go deleted file mode 100644 index 73a9299..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go +++ /dev/null @@ -1,124 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -import ( - "fmt" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "strings" - "testing" -) - -func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (got connectionSettings, err error) { - client, server := newClientServerOpts(t, copts, sopts) - defer closeClientServer(client, server) - - go func() { - for in := range server.Incoming() { - switch in := in.(type) { - case *IncomingConnection: - got = connectionSettings{user: in.User(), virtualHost: in.VirtualHost()} - } - in.Accept() - } - }() - - err = client.Sync() - return -} - -func TestAuthAnonymous(t *testing.T) { - fatalIf(t, configureSASL()) - got, err := testAuthClientServer(t, - []ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)}, - []ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)}) - fatalIf(t, err) - errorIf(t, checkEqual(connectionSettings{user: "anonymous", virtualHost: "vhost"}, got)) -} - -func TestAuthPlain(t *testing.T) { - fatalIf(t, configureSASL()) - got, err := testAuthClientServer(t, - []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))}, - []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")}) - fatalIf(t, err) - errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got)) -} - -func TestAuthBadPass(t *testing.T) { - fatalIf(t, configureSASL()) - _, err := testAuthClientServer(t, - []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))}, - []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")}) - if err == nil { - t.Error("Expected auth failure for bad pass") - } -} - -func TestAuthBadUser(t *testing.T) { - fatalIf(t, configureSASL()) - _, err := testAuthClientServer(t, - []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))}, - []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")}) - if err == nil { - t.Error("Expected auth failure for bad user") - } -} - -var confDir string -var confErr error - -func configureSASL() error { - if confDir != "" || confErr != nil { - return confErr - } - confDir, confErr = ioutil.TempDir("", "") - if confErr != nil { - return confErr - } - - GlobalSASLConfigDir(confDir) - GlobalSASLConfigName("test") - conf := filepath.Join(confDir, "test.conf") - - db := filepath.Join(confDir, "proton.sasldb") - cmd := exec.Command("saslpasswd2", "-c", "-p", "-f", db, "-u", "proton", "fred") - cmd.Stdin = strings.NewReader("xxx") // Password - if out, err := cmd.CombinedOutput(); err != nil { - confErr = fmt.Errorf("saslpasswd2 failed: %s\n%s", err, out) - return confErr - } - confStr := "sasldb_path: " + db + "\nmech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS\n" - if err := ioutil.WriteFile(conf, []byte(confStr), os.ModePerm); err != nil { - confErr = fmt.Errorf("write conf file %s failed: %s", conf, err) - } - return confErr -} - -func TestMain(m *testing.M) { - status := m.Run() - if confDir != "" { - _ = os.RemoveAll(confDir) - } - os.Exit(status) -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go deleted file mode 100644 index 7f3050f..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go +++ /dev/null @@ -1,405 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -// #include <proton/disposition.h> -import "C" - -import ( - "net" - "qpid.apache.org/proton" - "sync" - "time" -) - -// Settings associated with a Connection. -type ConnectionSettings interface { - // Authenticated user name associated with the connection. - User() string - - // The AMQP virtual host name for the connection. - // - // Optional, useful when the server has multiple names and provides different - // service based on the name the client uses to connect. - // - // By default it is set to the DNS host name that the client uses to connect, - // but it can be set to something different at the client side with the - // VirtualHost() option. - // - // Returns error if the connection fails to authenticate. - VirtualHost() string - - // Heartbeat is the maximum delay between sending frames that the remote peer - // has requested of us. If the interval expires an empty "heartbeat" frame - // will be sent automatically to keep the connection open. - Heartbeat() time.Duration -} - -// Connection is an AMQP connection, created by a Container. -type Connection interface { - Endpoint - ConnectionSettings - - // Sender opens a new sender on the DefaultSession. - Sender(...LinkOption) (Sender, error) - - // Receiver opens a new Receiver on the DefaultSession(). - Receiver(...LinkOption) (Receiver, error) - - // DefaultSession() returns a default session for the connection. It is opened - // on the first call to DefaultSession and returned on subsequent calls. - DefaultSession() (Session, error) - - // Session opens a new session. - Session(...SessionOption) (Session, error) - - // Container for the connection. - Container() Container - - // Disconnect the connection abruptly with an error. - Disconnect(error) - - // Wait waits for the connection to be disconnected. - Wait() error - - // WaitTimeout is like Wait but returns Timeout if the timeout expires. - WaitTimeout(time.Duration) error - - // Incoming returns a channel for incoming endpoints opened by the remote peer. - // See the Incoming interface for more. - // - // Not receiving from Incoming() and calling Accept/Reject will block the - // electron event loop. You should run a loop to handle the types that - // interest you in a switch{} and and Accept() all others. - Incoming() <-chan Incoming -} - -type connectionSettings struct { - user, virtualHost string - heartbeat time.Duration -} - -func (c connectionSettings) User() string { return c.user } -func (c connectionSettings) VirtualHost() string { return c.virtualHost } -func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat } - -// ConnectionOption can be passed when creating a connection to configure various options -type ConnectionOption func(*connection) - -// User returns a ConnectionOption sets the user name for a connection -func User(user string) ConnectionOption { - return func(c *connection) { - c.user = user - c.pConnection.SetUser(user) - } -} - -// VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection. -// Only applies to outbound client connection. -func VirtualHost(virtualHost string) ConnectionOption { - return func(c *connection) { - c.virtualHost = virtualHost - c.pConnection.SetHostname(virtualHost) - } -} - -// Password returns a ConnectionOption to set the password used to establish a -// connection. Only applies to outbound client connection. -// -// The connection will erase its copy of the password from memory as soon as it -// has been used to authenticate. If you are concerned about paswords staying in -// memory you should never store them as strings, and should overwrite your -// copy as soon as you are done with it. -// -func Password(password []byte) ConnectionOption { - return func(c *connection) { c.pConnection.SetPassword(password) } -} - -// Server returns a ConnectionOption to put the connection in server mode for incoming connections. -// -// A server connection will do protocol negotiation to accept a incoming AMQP -// connection. Normally you would call this for a connection created by -// net.Listener.Accept() -// -func Server() ConnectionOption { - return func(c *connection) { c.engine.Server(); c.server = true; AllowIncoming()(c) } -} - -// AllowIncoming returns a ConnectionOption to enable incoming endpoints, see -// Connection.Incoming() This is automatically set for Server() connections. -func AllowIncoming() ConnectionOption { - return func(c *connection) { c.incoming = make(chan Incoming) } -} - -// Parent returns a ConnectionOption that associates the Connection with it's Container -// If not set a connection will create its own default container. -func Parent(cont Container) ConnectionOption { - return func(c *connection) { c.container = cont.(*container) } -} - -type connection struct { - endpoint - connectionSettings - - defaultSessionOnce, closeOnce sync.Once - - container *container - conn net.Conn - server bool - incoming chan Incoming - handler *handler - engine *proton.Engine - pConnection proton.Connection - - defaultSession Session -} - -// NewConnection creates a connection with the given options. -func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) { - c := &connection{ - conn: conn, - } - c.handler = newHandler(c) - var err error - c.engine, err = proton.NewEngine(c.conn, c.handler.delegator) - if err != nil { - return nil, err - } - c.pConnection = c.engine.Connection() - for _, set := range opts { - set(c) - } - if c.container == nil { - c.container = NewContainer("").(*container) - } - c.pConnection.SetContainer(c.container.Id()) - globalSASLInit(c.engine) - - c.endpoint.init(c.engine.String()) - go c.run() - return c, nil -} - -func (c *connection) run() { - if !c.server { - c.pConnection.Open() - } - _ = c.engine.Run() - if c.incoming != nil { - close(c.incoming) - } - _ = c.closed(Closed) -} - -func (c *connection) Close(err error) { - c.err.Set(err) - c.engine.Close(err) -} - -func (c *connection) Disconnect(err error) { - c.err.Set(err) - c.engine.Disconnect(err) -} - -func (c *connection) Session(opts ...SessionOption) (Session, error) { - var s Session - err := c.engine.InjectWait(func() error { - if c.Error() != nil { - return c.Error() - } - pSession, err := c.engine.Connection().Session() - if err == nil { - pSession.Open() - if err == nil { - s = newSession(c, pSession, opts...) - } - } - return err - }) - return s, err -} - -func (c *connection) Container() Container { return c.container } - -func (c *connection) DefaultSession() (s Session, err error) { - c.defaultSessionOnce.Do(func() { - c.defaultSession, err = c.Session() - }) - if err == nil { - err = c.Error() - } - return c.defaultSession, err -} - -func (c *connection) Sender(opts ...LinkOption) (Sender, error) { - if s, err := c.DefaultSession(); err == nil { - return s.Sender(opts...) - } else { - return nil, err - } -} - -func (c *connection) Receiver(opts ...LinkOption) (Receiver, error) { - if s, err := c.DefaultSession(); err == nil { - return s.Receiver(opts...) - } else { - return nil, err - } -} - -func (c *connection) Connection() Connection { return c } - -func (c *connection) Wait() error { return c.WaitTimeout(Forever) } -func (c *connection) WaitTimeout(timeout time.Duration) error { - _, err := timedReceive(c.done, timeout) - if err == Timeout { - return Timeout - } - return c.Error() -} - -func (c *connection) Incoming() <-chan Incoming { - assert(c.incoming != nil, "electron.Connection.Incoming() disabled for %s", c) - return c.incoming -} - -type IncomingConnection struct { - incoming - connectionSettings - c *connection -} - -func newIncomingConnection(c *connection) *IncomingConnection { - c.user = c.pConnection.Transport().User() - c.virtualHost = c.pConnection.RemoteHostname() - return &IncomingConnection{ - incoming: makeIncoming(c.pConnection), - connectionSettings: c.connectionSettings, - c: c} -} - -// AcceptConnection is like Accept() but takes ConnectionOption s -// For example you can set the Heartbeat() for the accepted connection. -func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection { - return in.accept(func() Endpoint { - for _, opt := range opts { - opt(in.c) - } - in.c.pConnection.Open() - return in.c - }).(Connection) -} - -func (in *IncomingConnection) Accept() Endpoint { - return in.AcceptConnection() -} - -func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() } - -// SASLEnable returns a ConnectionOption that enables SASL authentication. -// Only required if you don't set any other SASL options. -func SASLEnable() ConnectionOption { return func(c *connection) { sasl(c) } } - -// SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL -// mechanisms. -// -// Can be used on the client or the server to restrict the SASL for a connection. -// mechs is a space-separated list of mechanism names. -// -func SASLAllowedMechs(mechs string) ConnectionOption { - return func(c *connection) { sasl(c).AllowedMechs(mechs) } -} - -// SASLAllowInsecure returns a ConnectionOption that allows or disallows clear -// text SASL authentication mechanisms -// -// By default the SASL layer is configured not to allow mechanisms that disclose -// the clear text of the password over an unencrypted AMQP connection. This specifically -// will disallow the use of the PLAIN mechanism without using SSL encryption. -// -// This default is to avoid disclosing password information accidentally over an -// insecure network. -// -func SASLAllowInsecure(b bool) ConnectionOption { - return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) } -} - -// Heartbeat returns a ConnectionOption that requests the maximum delay -// between sending frames for the remote peer. If we don't receive any frames -// within 2*delay we will close the connection. -// -func Heartbeat(delay time.Duration) ConnectionOption { - // Proton-C divides the idle-timeout by 2 before sending, so compensate. - return func(c *connection) { c.engine.Transport().SetIdleTimeout(2 * delay) } -} - -// GlobalSASLConfigDir sets the SASL configuration directory for every -// Connection created in this process. If not called, the default is determined -// by your SASL installation. -// -// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections. -// -func GlobalSASLConfigDir(dir string) { globalSASLConfigDir = dir } - -// GlobalSASLConfigName sets the SASL configuration name for every Connection -// created in this process. If not called the default is "proton-server". -// -// The complete configuration file name is -// <sasl-config-dir>/<sasl-config-name>.conf -// -// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections. -// -func GlobalSASLConfigName(dir string) { globalSASLConfigName = dir } - -var ( - globalSASLConfigName string - globalSASLConfigDir string -) - -// TODO aconway 2016-09-15: Current pn_sasl C impl config is broken, so all we -// can realistically offer is global configuration. Later if/when the pn_sasl C -// impl is fixed we can offer per connection over-rides. -func globalSASLInit(eng *proton.Engine) { - sasl := eng.Transport().SASL() - if globalSASLConfigName != "" { - sasl.ConfigName(globalSASLConfigName) - } - if globalSASLConfigDir != "" { - sasl.ConfigPath(globalSASLConfigDir) - } -} - -// Dial is shorthand for using net.Dial() then NewConnection() -func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err error) { - conn, err := net.Dial(network, addr) - if err == nil { - c, err = NewConnection(conn, opts...) - } - return -} - -// DialWithDialer is shorthand for using dialer.Dial() then NewConnection() -func DialWithDialer(dialer *net.Dialer, network, addr string, opts ...ConnectionOption) (c Connection, err error) { - conn, err := dialer.Dial(network, addr) - if err == nil { - c, err = NewConnection(conn, opts...) - } - return -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/container.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go deleted file mode 100644 index efb24ff..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go +++ /dev/null @@ -1,104 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -import ( - "net" - "qpid.apache.org/proton" - "strconv" - "sync/atomic" -) - -// Container is an AMQP container, it represents a single AMQP "application" -// which can have multiple client or server connections. -// -// Each Container in a distributed AMQP application must have a unique -// container-id which is applied to its connections. -// -// Create with NewContainer() -// -type Container interface { - // Id is a unique identifier for the container in your distributed application. - Id() string - - // Connection creates a connection associated with this container. - Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error) - - // Dial is shorthand for - // conn, err := net.Dial(); c, err := Connection(conn, opts...) - Dial(network string, addr string, opts ...ConnectionOption) (Connection, error) - - // Accept is shorthand for: - // conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...) - Accept(l net.Listener, opts ...ConnectionOption) (Connection, error) - - // String returns Id() - String() string -} - -type container struct { - id string - tagCounter uint64 -} - -func (cont *container) nextTag() string { - return strconv.FormatUint(atomic.AddUint64(&cont.tagCounter, 1), 32) -} - -// NewContainer creates a new container. The id must be unique in your -// distributed application, all connections created by the container -// will have this container-id. -// -// If id == "" a random UUID will be generated for the id. -func NewContainer(id string) Container { - if id == "" { - id = proton.UUID4().String() - } - cont := &container{id: id} - return cont -} - -func (cont *container) Id() string { return cont.id } - -func (cont *container) String() string { return cont.Id() } - -func (cont *container) nextLinkName() string { - return cont.id + "@" + cont.nextTag() -} - -func (cont *container) Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error) { - return NewConnection(conn, append(opts, Parent(cont))...) -} - -func (cont *container) Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) { - conn, err := net.Dial(network, address) - if err == nil { - c, err = cont.Connection(conn, opts...) - } - return -} - -func (cont *container) Accept(l net.Listener, opts ...ConnectionOption) (c Connection, err error) { - conn, err := l.Accept() - if err == nil { - c, err = cont.Connection(conn, append(opts, Server())...) - } - return -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
