Merge tag '0.19.0' into go1 Update `go get` branch to release 0.19.0
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b25d21e6 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b25d21e6 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b25d21e6 Branch: refs/heads/go1 Commit: b25d21e6779a011e972a5f00e433e2034b793da1 Parents: 6c48527 fe3c38c Author: Alan Conway <[email protected]> Authored: Fri Jan 5 11:32:52 2018 -0500 Committer: Alan Conway <[email protected]> Committed: Fri Jan 5 11:32:52 2018 -0500 ---------------------------------------------------------------------- README.md | 2 +- amqp/interop_test.go | 11 +- amqp/marshal.go | 201 +++++++++---- amqp/marshal_test.go | 113 +++++++ amqp/message.go | 48 ++- amqp/message_test.go | 12 +- amqp/types.go | 53 +++- amqp/types_test.go | 68 +++-- amqp/unmarshal.go | 669 +++++++++++++++++++++++------------------ amqp/url_test.go | 2 +- electron/auth_test.go | 8 +- electron/connection.go | 2 +- electron/electron_test.go | 10 +- electron/receiver.go | 2 +- electron/time.go | 2 +- proton/engine.go | 6 +- proton/handlers.go | 2 +- proton/message.go | 4 +- proton/wrappers.go | 4 +- 19 files changed, 793 insertions(+), 426 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/README.md ---------------------------------------------------------------------- diff --cc README.md index 0977787,b1bca51..5292454 --- a/README.md +++ b/README.md @@@ -1,116 -1,43 +1,116 @@@ -Qpid Proton - AMQP messaging toolkit -==================================== +# Qpid Go packages for AMQP -Linux/OSX Build | Windows Build -----------------|--------------- -[](https://travis-ci.org/apache/qpid-proton) | [](https://ci.appveyor.com/project/ke4qqq/qpid-proton/branch/master) +These packages provide [Go](http://golang.org) support for sending and receiving +AMQP messages in client or server applications. Reference documentation is +available at: <http://godoc.org/?q=qpid.apache.org> -Qpid Proton is a high-performance, lightweight messaging library. It can be -used in the widest range of messaging applications, including brokers, client -libraries, routers, bridges, proxies, and more. Proton makes it trivial to -integrate with the AMQP 1.0 ecosystem from any platform, environment, or -language +They require the +[proton-C library and header files](http://qpid.apache.org/proton) to be - installed. On many platforms it is avaialable pre-packaged, for example on ++installed. On many platforms it is available pre-packaged, for example on +Fedora -Features --------- + dnf install qpid-proton-c-devel - - A flexible and capable reactive messaging API - - Full control of AMQP 1.0 protocol semantics - - Portable C implementation with bindings to popular languages - - Peer-to-peer and brokered messaging - - Secure communication via SSL and SASL +If you built proton from source, you can set environment variables to find the +built libraries and headers as follows: -Universal - Proton is designed to scale both up and down. Equally suitable for -simple clients or high-powered servers, it can be deployed in simple -peer-to-peer configurations or as part of a global federated messaging network. + source <build-directory>/config.sh -Embeddable - Proton is carefully written to be portable and cross platform. It -has minimal dependencies, and it is architected to be usable with any threading -model, as well as with non-threaded applications. These features make it -uniquely suited for embedding messaging capabilities into existing software. +If you have installed the library and headers in non-standard directories, then +add them to the following environment variables: -Standard - Built around the AMQP 1.0 messaging standard, Proton is not only -ideal for building out your own messaging applications but also for connecting -them to the broader ecosystem of AMQP 1.0-based messaging applications. + LD_LIBRARY_PATH # directory containing the library + LIBRARY_PATH # directory containing the library + C_INCLUDE_PATH # directory containing the proton/ subdirectory with header files -Getting Started ---------------- +There are 3 packages: -See the included INSTALL.md file for build and install instructions and the -DEVELOPERS file for information on how to modify and test the library code -itself. +[qpid.apache.org/amqp](http://godoc.org/qpid.apache.org/amqp) provides functions +to convert AMQP messages and data types to and from Go data types. Used by both +the proton and electron packages to manage AMQP data. -Please see http://qpid.apache.org/proton for a more info. +[qpid.apache.org/electron](http://godoc.org/qpid.apache.org/electron) is a +simple, concurrent-safe API for sending and receiving messages. It can be used +with goroutines and channels to build concurrent AMQP clients and servers. + +[qpid.apache.org/proton](http://godoc.org/qpid.apache.org/proton) is an +event-driven, concurrent-unsafe package that closely follows the proton C +API. Most Go programmers will find the +[electron](http://godoc.org/qpid.apache.org/electron) package easier to use. + +See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md) +to help you get started. + +Feedback is encouraged at: + +- Email <[email protected]> +- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue. + +### Why two APIs? + +The `proton` API is a direct mapping of the proton C library into Go. It is +usable but not very natural for a Go programmer because it takes an +*event-driven* approach and has no built-in support for concurrent +use. `electron` uses `proton` internally but provides a more Go-like API that is +safe to use from multiple concurrent goroutines. + +Go encourages programs to be structured as concurrent *goroutines* that +communicate via *channels*. Go literature distinguishes between: + +- *concurrency*: "keeping track of things that could be done in parallel" +- *parallelism*: "actually doing things in parallel on multiple CPUs or cores" + +A Go program expresses concurrency by starting goroutines for potentially +concurrent tasks. The Go runtime schedules the activity of goroutines onto a +small number (possibly one) of actual parallel executions. + +Even with no hardware parallelism, goroutine concurrency lets the Go runtime +order unpredictable events like file descriptors being readable/writable, +channels having data, timers firing etc. Go automatically takes care of +switching out goroutines that block or sleep so it is normal to write code in +terms of blocking calls. + +By contrast, event-driven programming is based on polling mechanisms like +`select`, `poll` or `epoll`. These also dispatch unpredictably ordered events to +a single thread or a small thread pool. However this requires a different style +of programming: "event-driven" or "reactive" programming. Go developers call it +"inside-out" programming. In an event-driven program blocking is a big problem +as it consumes a scarce thread of execution, so actions that take time to +complete have to be re-structured in terms of multiple events. + +The promise of Go is that you can express your program in concurrent, sequential +terms and the Go runtime will turn it inside-out for you. You can start +goroutines for all concurrent activities. They can loop forever or block for as +long as they need waiting for timers, IO or any unpredictable event. Go will +interleave and schedule them efficiently onto the available parallel hardware. + +For example: in the `electron` API, you can send a message and wait for it to be +acknowledged in a single function. All the information about the message, why +you sent it, and what to do when it is acknowledged can be held in local +variables, all the code is in a simple sequence. Other goroutines in your +program can be sending and receiving messages concurrently, they are not +blocked. + +In the `proton` API, an event handler that sends a message must return +*immediately*, it cannot block the event loop to wait for +acknowledgement. Acknowledgement is a separate event, so the code for handling +it is in a different event handler. Context information about the message has to +be stored in some non-local variable that both functions can find. This makes +the code harder to follow. + +The `proton` API is important because it is the foundation for the `electron` +API, and may be useful for programs that need to be close to the original C +library for some reason. However the `electron` API hides the event-driven +details behind simple, sequential, concurrent-safe methods that can be called +from arbitrary goroutines. Under the covers, data is passed through channels to +dedicated `proton` goroutines so user goroutines can work concurrently with the +proton event-loop. + +## New to Go? + +If you are new to Go then these are a good place to start: + +- [A Tour of Go](http://tour.golang.org) +- [Effective Go](http://golang.org/doc/effective_go.html) + +Then look at the tools and docs at <http://golang.org> as you need them. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/amqp/interop_test.go ---------------------------------------------------------------------- diff --cc amqp/interop_test.go index a5fb92e,0000000..8ec6734 mode 100644,000000..100644 --- a/amqp/interop_test.go +++ b/amqp/interop_test.go @@@ -1,377 -1,0 +1,384 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Test that conversion of Go type to/from AMQP is compatible with other +// bindings. +// +package amqp + +import ( + "bytes" + "io" + "io/ioutil" + "os" + "reflect" + "strings" + "testing" +) + ++var skipped = false ++ +func getReader(t *testing.T, name string) (r io.Reader) { + dir := os.Getenv("PN_INTEROP_DIR") + if dir == "" { - t.Skip("no PN_INTEROP_DIR in environment") ++ if !skipped { ++ skipped = true // Don't keep repeating ++ t.Skip("no PN_INTEROP_DIR in environment") ++ } else { ++ t.SkipNow() ++ } + } + r, err := os.Open(dir + "/" + name + ".amqp") + if err != nil { + t.Fatalf("can't open %#v: %v", name, err) + } + return +} + +func remaining(d *Decoder) string { + remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader)) + return string(remainder) +} + +// checkDecode: want is the expected value, gotPtr is a pointer to a +// instance of the same type for Decode. +func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) { + + if err := d.Decode(gotPtr); err != nil { + t.Error("Decode failed", err) + return + } + got := reflect.ValueOf(gotPtr).Elem().Interface() + if err := checkEqual(want, got); err != nil { + t.Error("Decode bad value:", err) + return + } + + // Try round trip encoding + bytes, err := Marshal(want, nil) + if err != nil { + t.Error("Marshal failed", err) + return + } + n, err := Unmarshal(bytes, gotPtr) + if err != nil { + t.Error("Unmarshal failed", err) + return + } + if err := checkEqual(n, len(bytes)); err != nil { + t.Error("Bad unmarshal length", err) + return + } + got = reflect.ValueOf(gotPtr).Elem().Interface() + if err = checkEqual(want, got); err != nil { + t.Error("Bad unmarshal value", err) + return + } +} + +func TestUnmarshal(t *testing.T) { + bytes, err := ioutil.ReadAll(getReader(t, "strings")) + if err != nil { + t.Error(err) + } + for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { + var got string + n, err := Unmarshal(bytes, &got) + if err != nil { + t.Error(err) + } + if want != got { + t.Errorf("%#v != %#v", want, got) + } + bytes = bytes[n:] + } +} + +func TestPrimitivesExact(t *testing.T) { + d := NewDecoder(getReader(t, "primitives")) + // Decoding into exact types + var b bool + checkDecode(d, true, &b, t) + checkDecode(d, false, &b, t) + var u8 uint8 + checkDecode(d, uint8(42), &u8, t) + var u16 uint16 + checkDecode(d, uint16(42), &u16, t) + var i16 int16 + checkDecode(d, int16(-42), &i16, t) + var u32 uint32 + checkDecode(d, uint32(12345), &u32, t) + var i32 int32 + checkDecode(d, int32(-12345), &i32, t) + var u64 uint64 + checkDecode(d, uint64(12345), &u64, t) + var i64 int64 + checkDecode(d, int64(-12345), &i64, t) + var f32 float32 + checkDecode(d, float32(0.125), &f32, t) + var f64 float64 + checkDecode(d, float64(0.125), &f64, t) +} + +func TestPrimitivesCompatible(t *testing.T) { + d := NewDecoder(getReader(t, "primitives")) + // Decoding into compatible types + var b bool + var i int + var u uint + var f float64 + checkDecode(d, true, &b, t) + checkDecode(d, false, &b, t) + checkDecode(d, uint(42), &u, t) + checkDecode(d, uint(42), &u, t) + checkDecode(d, -42, &i, t) + checkDecode(d, uint(12345), &u, t) + checkDecode(d, -12345, &i, t) + checkDecode(d, uint(12345), &u, t) + checkDecode(d, -12345, &i, t) + checkDecode(d, 0.125, &f, t) + checkDecode(d, 0.125, &f, t) +} + +// checkDecodeValue: want is the expected value, decode into a reflect.Value +func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) { + + var got, got2 interface{} + if err := d.Decode(&got); err != nil { + t.Error("Decode failed", err) + return + } + if err := checkEqual(want, got); err != nil { + t.Error(err) + return + } + // Try round trip encoding + bytes, err := Marshal(got, nil) + if err != nil { + t.Error(err) + return + } + n, err := Unmarshal(bytes, &got2) + if err != nil { + t.Error(err) + return + } + if err := checkEqual(n, len(bytes)); err != nil { + t.Error(err) + return + } + if err := checkEqual(want, got2); err != nil { + t.Error(err) + return + } +} + +func TestPrimitivesInterface(t *testing.T) { + d := NewDecoder(getReader(t, "primitives")) + checkDecodeInterface(d, true, t) + checkDecodeInterface(d, false, t) + checkDecodeInterface(d, uint8(42), t) + checkDecodeInterface(d, uint16(42), t) + checkDecodeInterface(d, int16(-42), t) + checkDecodeInterface(d, uint32(12345), t) + checkDecodeInterface(d, int32(-12345), t) + checkDecodeInterface(d, uint64(12345), t) + checkDecodeInterface(d, int64(-12345), t) + checkDecodeInterface(d, float32(0.125), t) + checkDecodeInterface(d, float64(0.125), t) +} + +func TestStrings(t *testing.T) { + d := NewDecoder(getReader(t, "strings")) + // Test decoding as plain Go strings + for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { + var got string + checkDecode(d, want, &got, t) + } + remains := remaining(d) + if remains != "" { + t.Errorf("leftover: %s", remains) + } + + // Test decoding as specific string types + d = NewDecoder(getReader(t, "strings")) + var bytes []byte + var str, sym string + checkDecode(d, []byte("abc\000defg"), &bytes, t) + checkDecode(d, "abcdefg", &str, t) + checkDecode(d, "abcdefg", &sym, t) + checkDecode(d, make([]byte, 0), &bytes, t) + checkDecode(d, "", &str, t) + checkDecode(d, "", &sym, t) + remains = remaining(d) + if remains != "" { + t.Fatalf("leftover: %s", remains) + } + + // Test some error handling + d = NewDecoder(getReader(t, "strings")) + var s string + err := d.Decode(s) + if err == nil { + t.Fatal("Expected error") + } + if !strings.Contains(err.Error(), "not a pointer") { + t.Error(err) + } + var i int + err = d.Decode(&i) + if !strings.Contains(err.Error(), "cannot unmarshal") { + t.Error(err) + } + _, err = Unmarshal([]byte{}, nil) - if !strings.Contains(err.Error(), "not enough data") { ++ if err := checkEqual(err, EndOfData); err != nil { + t.Error(err) + } + _, err = Unmarshal([]byte("foobar"), nil) + if !strings.Contains(err.Error(), "invalid-argument") { + t.Error(err) + } +} + +func TestEncodeDecode(t *testing.T) { + type data struct { + s string + i int + u8 uint8 + b bool + f float32 + v interface{} + } + + in := data{"foo", 42, 9, true, 1.234, "thing"} + + buf := bytes.Buffer{} + e := NewEncoder(&buf) + if err := e.Encode(in.s); err != nil { + t.Error(err) + } + if err := e.Encode(in.i); err != nil { + t.Error(err) + } + if err := e.Encode(in.u8); err != nil { + t.Error(err) + } + if err := e.Encode(in.b); err != nil { + t.Error(err) + } + if err := e.Encode(in.f); err != nil { + t.Error(err) + } + if err := e.Encode(in.v); err != nil { + t.Error(err) + } + + var out data + d := NewDecoder(&buf) + if err := d.Decode(&out.s); err != nil { + t.Error(err) + } + if err := d.Decode(&out.i); err != nil { + t.Error(err) + } + if err := d.Decode(&out.u8); err != nil { + t.Error(err) + } + if err := d.Decode(&out.b); err != nil { + t.Error(err) + } + if err := d.Decode(&out.f); err != nil { + t.Error(err) + } + if err := d.Decode(&out.v); err != nil { + t.Error(err) + } + + if err := checkEqual(in, out); err != nil { + t.Error(err) + } +} + +func TestMap(t *testing.T) { + d := NewDecoder(getReader(t, "maps")) + + // Generic map + var m Map + checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t) + + // Interface as map + var i interface{} + checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t) + + d = NewDecoder(getReader(t, "maps")) + // Specific typed map + var m2 map[string]int + checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t) + + // Nested map + m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}} + bytes, err := Marshal(m, nil) + if err != nil { + t.Fatal(err) + } + _, err = Unmarshal(bytes, &i) + if err != nil { + t.Fatal(err) + } + if err = checkEqual(m, i); err != nil { + t.Fatal(err) + } +} + +func TestList(t *testing.T) { + d := NewDecoder(getReader(t, "lists")) + var l List + checkDecode(d, List{int32(32), "foo", true}, &l, t) + checkDecode(d, List{}, &l, t) +} + +// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as +// as an AMQP string *inside* an AMQP binary?? Skip the test for now. +func TODO_TestMessage(t *testing.T) { + bytes, err := ioutil.ReadAll(getReader(t, "message")) + if err != nil { + t.Fatal(err) + } + + m, err := DecodeMessage(bytes) + if err != nil { + t.Fatal(err) + } else { + if err := checkEqual(m.Body(), "hello"); err != nil { + t.Error(err) + } + } + + m2 := NewMessageWith("hello") + bytes2, err := m2.Encode(nil) + if err != nil { + t.Error(err) + } else { + if err = checkEqual(bytes, bytes2); err != nil { + t.Error(err) + } + } +} + +// TODO aconway 2015-03-13: finish the full interop test http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/amqp/marshal.go ---------------------------------------------------------------------- diff --cc amqp/marshal.go index ca5e380,0000000..33b30a8 mode 100644,000000..100644 --- a/amqp/marshal.go +++ b/amqp/marshal.go @@@ -1,281 -1,0 +1,360 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +import "C" + +import ( + "fmt" + "io" + "reflect" ++ "time" + "unsafe" +) + +// Error returned if Go data cannot be marshaled as an AMQP type. +type MarshalError struct { + // The Go type. + GoType reflect.Type + s string +} + +func (e MarshalError) Error() string { return e.s } + +func newMarshalError(v interface{}, s string) *MarshalError { + t := reflect.TypeOf(v) + return &MarshalError{GoType: t, s: fmt.Sprintf("cannot marshal %s: %s", t, s)} +} + +func dataMarshalError(v interface{}, data *C.pn_data_t) error { + if pe := PnError(C.pn_data_error(data)); pe != nil { + return newMarshalError(v, pe.Error()) + } + return nil +} + - func recoverMarshal(err *error) { - if r := recover(); r != nil { - if merr, ok := r.(*MarshalError); ok { - *err = merr - } else { - panic(r) - } - } - } - +/* +Marshal encodes a Go value as AMQP data in buffer. +If buffer is nil, or is not large enough, a new buffer is created. + +Returns the buffer used for encoding with len() adjusted to the actual size of data. + +Go types are encoded as follows + + +-------------------------------------+--------------------------------------------+ + |Go type |AMQP type | + +-------------------------------------+--------------------------------------------+ + |bool |bool | + +-------------------------------------+--------------------------------------------+ + |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) | + +-------------------------------------+--------------------------------------------+ + |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) | + +-------------------------------------+--------------------------------------------+ + |float32, float64 |float, double. | + +-------------------------------------+--------------------------------------------+ + |string |string | + +-------------------------------------+--------------------------------------------+ + |[]byte, Binary |binary | + +-------------------------------------+--------------------------------------------+ + |Symbol |symbol | + +-------------------------------------+--------------------------------------------+ ++ |Char |char | ++ +-------------------------------------+--------------------------------------------+ + |interface{} |the contained type | + +-------------------------------------+--------------------------------------------+ + |nil |null | + +-------------------------------------+--------------------------------------------+ + |map[K]T |map with K and T converted as above | + +-------------------------------------+--------------------------------------------+ + |Map |map, may have mixed types for keys, values | + +-------------------------------------+--------------------------------------------+ - |[]T |list with T converted as above | ++ |AnyMap |map (See AnyMap) | ++ +-------------------------------------+--------------------------------------------+ ++ |List, []interface{} |list, may have mixed-type values | + +-------------------------------------+--------------------------------------------+ - |List |list, may have mixed types values | ++ |[]T, [N]T |array, T is mapped as per this table | + +-------------------------------------+--------------------------------------------+ + |Described |described type | + +-------------------------------------+--------------------------------------------+ ++ |time.Time |timestamp | ++ +-------------------------------------+--------------------------------------------+ ++ |UUID |uuid | ++ +-------------------------------------+--------------------------------------------+ + - The following Go types cannot be marshaled: uintptr, function, channel, array (use slice), struct - - TODO: Not yet implemented: - - Go types: struct, complex64/128. ++The following Go types cannot be marshaled: uintptr, function, channel, struct, complex64/128 + - AMQP types: decimal32/64/128, char, timestamp, uuid, array. ++AMQP types not yet supported: ++- decimal32/64/128, +*/ ++ +func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { - defer recoverMarshal(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) - marshal(v, data) ++ if err = recoverMarshal(v, data); err != nil { ++ return buffer, err ++ } + encode := func(buf []byte) ([]byte, error) { + n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf))) + switch { + case n == int(C.PN_OVERFLOW): + return buf, overflow + case n < 0: + return buf, dataMarshalError(v, data) + default: + return buf[:n], nil + } + } + return encodeGrow(buffer, encode) +} + - // Internal - func MarshalUnsafe(v interface{}, pn_data unsafe.Pointer) (err error) { - defer recoverMarshal(&err) - marshal(v, (*C.pn_data_t)(pn_data)) ++// Internal use only ++func MarshalUnsafe(v interface{}, pnData unsafe.Pointer) (err error) { ++ return recoverMarshal(v, (*C.pn_data_t)(pnData)) ++} ++ ++func recoverMarshal(v interface{}, data *C.pn_data_t) (err error) { ++ defer func() { // Convert panic to error return ++ if r := recover(); r != nil { ++ if err2, ok := r.(*MarshalError); ok { ++ err = err2 // Convert internal panic to error ++ } else { ++ panic(r) // Unrecognized error, continue to panic ++ } ++ } ++ }() ++ marshal(v, data) // Panics on error + return +} + +const minEncode = 256 + +// overflow is returned when an encoding function can't fit data in the buffer. +var overflow = fmt.Errorf("buffer too small") + +// encodeFn encodes into buffer[0:len(buffer)]. +// Returns buffer with length adjusted for data encoded. +// If buffer too small, returns overflow as error. +type encodeFn func(buffer []byte) ([]byte, error) + +// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer. +// Returns the final buffer. +func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) { + if buffer == nil || len(buffer) == 0 { + buffer = make([]byte, minEncode) + } + var err error + for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) { + buffer = make([]byte, 2*len(buffer)) + } + return buffer, err +} + - func marshal(v interface{}, data *C.pn_data_t) { - switch v := v.(type) { ++// Marshal v to data ++func marshal(i interface{}, data *C.pn_data_t) { ++ switch v := i.(type) { + case nil: + C.pn_data_put_null(data) + case bool: + C.pn_data_put_bool(data, C.bool(v)) ++ ++ // Signed integers + case int8: + C.pn_data_put_byte(data, C.int8_t(v)) + case int16: + C.pn_data_put_short(data, C.int16_t(v)) + case int32: + C.pn_data_put_int(data, C.int32_t(v)) + case int64: + C.pn_data_put_long(data, C.int64_t(v)) + case int: - if unsafe.Sizeof(int(0)) == 8 { ++ if intIs64 { + C.pn_data_put_long(data, C.int64_t(v)) + } else { + C.pn_data_put_int(data, C.int32_t(v)) + } ++ ++ // Unsigned integers + case uint8: + C.pn_data_put_ubyte(data, C.uint8_t(v)) + case uint16: + C.pn_data_put_ushort(data, C.uint16_t(v)) + case uint32: + C.pn_data_put_uint(data, C.uint32_t(v)) + case uint64: + C.pn_data_put_ulong(data, C.uint64_t(v)) + case uint: - if unsafe.Sizeof(int(0)) == 8 { ++ if intIs64 { + C.pn_data_put_ulong(data, C.uint64_t(v)) + } else { + C.pn_data_put_uint(data, C.uint32_t(v)) + } ++ ++ // Floating point + case float32: + C.pn_data_put_float(data, C.float(v)) + case float64: + C.pn_data_put_double(data, C.double(v)) ++ ++ // String-like (string, binary, symbol) + case string: + C.pn_data_put_string(data, pnBytes([]byte(v))) + case []byte: + C.pn_data_put_binary(data, pnBytes(v)) + case Binary: + C.pn_data_put_binary(data, pnBytes([]byte(v))) + case Symbol: + C.pn_data_put_symbol(data, pnBytes([]byte(v))) - case Map: // Special map type - C.pn_data_put_map(data) - C.pn_data_enter(data) - for key, val := range v { - marshal(key, data) - marshal(val, data) - } - C.pn_data_exit(data) ++ ++ // Other simple types ++ case time.Time: ++ C.pn_data_put_timestamp(data, C.pn_timestamp_t(v.UnixNano()/1000)) ++ case UUID: ++ C.pn_data_put_uuid(data, *(*C.pn_uuid_t)(unsafe.Pointer(&v[0]))) ++ case Char: ++ C.pn_data_put_char(data, (C.pn_char_t)(v)) ++ ++ // Described types + case Described: + C.pn_data_put_described(data) + C.pn_data_enter(data) + marshal(v.Descriptor, data) + marshal(v.Value, data) + C.pn_data_exit(data) ++ ++ // Restricted type annotation-key, marshals as contained value + case AnnotationKey: + marshal(v.Get(), data) ++ ++ // Special type to represent AMQP maps with keys that are illegal in Go ++ case AnyMap: ++ C.pn_data_put_map(data) ++ C.pn_data_enter(data) ++ defer C.pn_data_exit(data) ++ for _, kv := range v { ++ marshal(kv.Key, data) ++ marshal(kv.Value, data) ++ } ++ + default: - switch reflect.TypeOf(v).Kind() { ++ // Examine complex types (Go map, slice, array) by reflected structure ++ switch reflect.TypeOf(i).Kind() { ++ + case reflect.Map: - putMap(data, v) - case reflect.Slice: - putList(data, v) ++ m := reflect.ValueOf(v) ++ C.pn_data_put_map(data) ++ if C.pn_data_enter(data) { ++ defer C.pn_data_exit(data) ++ } else { ++ panic(dataMarshalError(i, data)) ++ } ++ for _, key := range m.MapKeys() { ++ marshal(key.Interface(), data) ++ marshal(m.MapIndex(key).Interface(), data) ++ } ++ ++ case reflect.Slice, reflect.Array: ++ // Note: Go array and slice are mapped the same way: ++ // if element type is an interface, map to AMQP list (mixed type) ++ // if element type is a non-interface type map to AMQP array (single type) ++ s := reflect.ValueOf(v) ++ if pnType, ok := arrayTypeMap[s.Type().Elem()]; ok { ++ C.pn_data_put_array(data, false, pnType) ++ } else { ++ C.pn_data_put_list(data) ++ } ++ C.pn_data_enter(data) ++ defer C.pn_data_exit(data) ++ for j := 0; j < s.Len(); j++ { ++ marshal(s.Index(j).Interface(), data) ++ } ++ + default: + panic(newMarshalError(v, "no conversion")) + } + } - if err := dataMarshalError(v, data); err != nil { ++ if err := dataMarshalError(i, data); err != nil { + panic(err) + } - return +} + - func clearMarshal(v interface{}, data *C.pn_data_t) { - C.pn_data_clear(data) - marshal(v, data) ++// Mapping froo Go element type to AMQP array type for types that can go in an AMQP array ++// NOTE: this must be kept consistent with marshal() which does the actual marshalling. ++var arrayTypeMap = map[reflect.Type]C.pn_type_t{ ++ nil: C.PN_NULL, ++ reflect.TypeOf(true): C.PN_BOOL, ++ ++ reflect.TypeOf(int8(0)): C.PN_BYTE, ++ reflect.TypeOf(int16(0)): C.PN_INT, ++ reflect.TypeOf(int32(0)): C.PN_SHORT, ++ reflect.TypeOf(int64(0)): C.PN_LONG, ++ ++ reflect.TypeOf(uint8(0)): C.PN_UBYTE, ++ reflect.TypeOf(uint16(0)): C.PN_UINT, ++ reflect.TypeOf(uint32(0)): C.PN_USHORT, ++ reflect.TypeOf(uint64(0)): C.PN_ULONG, ++ ++ reflect.TypeOf(float32(0)): C.PN_FLOAT, ++ reflect.TypeOf(float64(0)): C.PN_DOUBLE, ++ ++ reflect.TypeOf(""): C.PN_STRING, ++ reflect.TypeOf((*Symbol)(nil)).Elem(): C.PN_SYMBOL, ++ reflect.TypeOf((*Binary)(nil)).Elem(): C.PN_BINARY, ++ reflect.TypeOf([]byte{}): C.PN_BINARY, ++ ++ reflect.TypeOf((*time.Time)(nil)).Elem(): C.PN_TIMESTAMP, ++ reflect.TypeOf((*UUID)(nil)).Elem(): C.PN_UUID, ++ reflect.TypeOf((*Char)(nil)).Elem(): C.PN_CHAR, +} + - func putMap(data *C.pn_data_t, v interface{}) { - mapValue := reflect.ValueOf(v) - C.pn_data_put_map(data) - C.pn_data_enter(data) - for _, key := range mapValue.MapKeys() { - marshal(key.Interface(), data) - marshal(mapValue.MapIndex(key).Interface(), data) ++// Compute mapping of int/uint at runtime as they depend on execution environment. ++func init() { ++ if intIs64 { ++ arrayTypeMap[reflect.TypeOf(int(0))] = C.PN_LONG ++ arrayTypeMap[reflect.TypeOf(uint(0))] = C.PN_ULONG ++ } else { ++ arrayTypeMap[reflect.TypeOf(int(0))] = C.PN_INT ++ arrayTypeMap[reflect.TypeOf(uint(0))] = C.PN_UINT + } - C.pn_data_exit(data) +} + - func putList(data *C.pn_data_t, v interface{}) { - listValue := reflect.ValueOf(v) - C.pn_data_put_list(data) - C.pn_data_enter(data) - for i := 0; i < listValue.Len(); i++ { - marshal(listValue.Index(i).Interface(), data) - } - C.pn_data_exit(data) ++func clearMarshal(v interface{}, data *C.pn_data_t) { ++ C.pn_data_clear(data) ++ marshal(v, data) +} + +// Encoder encodes AMQP values to an io.Writer +type Encoder struct { + writer io.Writer + buffer []byte +} + +// New encoder returns a new encoder that writes to w. +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w, make([]byte, minEncode)} +} + +func (e *Encoder) Encode(v interface{}) (err error) { + e.buffer, err = Marshal(v, e.buffer) + if err == nil { + _, err = e.writer.Write(e.buffer) + } + return err +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/amqp/marshal_test.go ---------------------------------------------------------------------- diff --cc amqp/marshal_test.go index d8e0711,0000000..e679cc3 mode 100644,000000..100644 --- a/amqp/marshal_test.go +++ b/amqp/marshal_test.go @@@ -1,90 -1,0 +1,203 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +import ( ++ "strings" + "testing" +) + +func TestSymbolKey(t *testing.T) { + bytes, err := Marshal(AnnotationKeySymbol("foo"), nil) + if err != nil { + t.Fatal(err) + } + var k AnnotationKey + if _, err := Unmarshal(bytes, &k); err != nil { + t.Error(err) + } + if err := checkEqual("foo", string(k.Get().(Symbol))); err != nil { + t.Error(err) + } + var sym Symbol + if _, err := Unmarshal(bytes, &sym); err != nil { + t.Error(err) + } + if err := checkEqual("foo", sym.String()); err != nil { + t.Error(err) + } + +} + +func TestStringKey(t *testing.T) { + bytes, err := Marshal(AnnotationKeyString("foo"), nil) + if err != nil { + t.Fatal(err) + } + var k AnnotationKey + if _, err := Unmarshal(bytes, &k); err != nil { + t.Error(err) + } + if err := checkEqual("foo", string(k.Get().(Symbol))); err != nil { + t.Error(err) + } + var s string + if _, err := Unmarshal(bytes, &s); err != nil { + t.Error(err) + } + if err := checkEqual("foo", s); err != nil { + t.Error(err) + } + +} + +func TestIntKey(t *testing.T) { + bytes, err := Marshal(AnnotationKeyUint64(12345), nil) + if err != nil { + t.Fatal(err) + } + var k AnnotationKey + if _, err := Unmarshal(bytes, &k); err != nil { + t.Error(err) + } + if 12345 != k.Get().(uint64) { + t.Errorf("(%T)%v != (%T)%v", 12345, k.Get().(uint64)) + } + var n uint64 + if _, err := Unmarshal(bytes, &n); err != nil { + t.Error(err) + } + if 12345 != n { + t.Errorf("%v != %v", 12345, k.Get().(uint64)) + } + +} ++ ++func TestMapToMap(t *testing.T) { ++ in := Map{"k": "v", "x": "y", true: false, int8(3): uint64(24)} ++ if bytes, err := Marshal(in, nil); err == nil { ++ var out Map ++ if _, err := Unmarshal(bytes, &out); err == nil { ++ if err = checkEqual(in, out); err != nil { ++ t.Error(err) ++ } ++ } else { ++ t.Error(err) ++ } ++ } ++} ++ ++func TestMapToInterface(t *testing.T) { ++ in := Map{"k": "v", "x": "y", true: false, int8(3): uint64(24)} ++ if bytes, err := Marshal(in, nil); err == nil { ++ var out interface{} ++ if _, err := Unmarshal(bytes, &out); err == nil { ++ if err = checkEqual(in, out); err != nil { ++ t.Error(err) ++ } ++ } else { ++ t.Error(err) ++ } ++ } ++} ++ ++func TestAnyMap(t *testing.T) { ++ // nil ++ bytes, err := Marshal(AnyMap(nil), nil) ++ if err != nil { ++ t.Error(err) ++ } ++ var out AnyMap ++ if _, err := Unmarshal(bytes, &out); err != nil { ++ t.Error(err) ++ } ++ if err = checkEqual(AnyMap(nil), out); err != nil { ++ t.Error(err) ++ } ++ ++ // empty ++ bytes, err = Marshal(AnyMap{}, nil) ++ if err != nil { ++ t.Error(err) ++ } ++ if _, err := Unmarshal(bytes, &out); err != nil { ++ t.Error(err) ++ } ++ if err = checkEqual(AnyMap(nil), out); err != nil { ++ t.Error(err) ++ } ++ ++ // with data ++ in := AnyMap{{"k", "v"}, {true, false}} ++ bytes, err = Marshal(in, nil) ++ if err != nil { ++ t.Error(err) ++ } ++ if _, err := Unmarshal(bytes, &out); err != nil { ++ t.Error(err) ++ } ++ if err = checkEqual(in, out); err != nil { ++ t.Error(err) ++ } ++} ++ ++func TestBadMap(t *testing.T) { ++ // unmarshal map with invalid keys ++ in := AnyMap{{"k", "v"}, {[]string{"x", "y"}, "invalid-key"}} ++ bytes, err := Marshal(in, nil) ++ if err != nil { ++ t.Error(err) ++ } ++ m := Map{} ++ // Should fail to unmarshal to a map ++ if _, err := Unmarshal(bytes, &m); err != nil { ++ if !strings.Contains(err.Error(), "key []string{\"x\", \"y\"} is not comparable") { ++ t.Error(err) ++ } ++ } else { ++ t.Error("expected error") ++ } ++ // Should unmarshal to an AnyMap ++ var out AnyMap ++ if _, err := Unmarshal(bytes, &out); err != nil { ++ t.Error(err) ++ } else if err = checkEqual(in, out); err != nil { ++ t.Error(err) ++ } ++ // Should unmarshal to interface{} as AnyMap ++ var v interface{} ++ if _, err := Unmarshal(bytes, &v); err != nil { ++ t.Error(err) ++ } else if err = checkEqual(in, v); err != nil { ++ t.Error(err) ++ } ++ // Round trip from interface to interface ++ in = AnyMap{{[]int8{1, 2, 3}, "bad-key"}, {int16(1), "duplicate-1"}, {int16(1), "duplicate-2"}} ++ bytes, err = Marshal(interface{}(in), nil) ++ if err != nil { ++ t.Error(err) ++ } ++ v = nil ++ if _, err := Unmarshal(bytes, &v); err != nil { ++ t.Error(err) ++ } else if err = checkEqual(in, v); err != nil { ++ t.Error(err) ++ } ++} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/amqp/message.go ---------------------------------------------------------------------- diff --cc amqp/message.go index d1ad2eb,0000000..389fa37 mode 100644,000000..100644 --- a/amqp/message.go +++ b/amqp/message.go @@@ -1,402 -1,0 +1,420 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +// #include <proton/types.h> +// #include <proton/message.h> +// #include <stdlib.h> +// +// /* Helper for setting message string fields */ +// typedef int (*set_fn)(pn_message_t*, const char*); +// int msg_set_str(pn_message_t* m, char* s, set_fn set) { +// int result = set(m, s); +// free(s); +// return result; +// } +// +import "C" + +import ( + "fmt" + "runtime" + "time" +) + +// Message is the interface to an AMQP message. +type Message interface { + // Durable indicates that any parties taking responsibility + // for the message must durably store the content. + Durable() bool + SetDurable(bool) + + // Priority impacts ordering guarantees. Within a + // given ordered context, higher priority messages may jump ahead of + // lower priority messages. + Priority() uint8 + SetPriority(uint8) + + // TTL or Time To Live, a message it may be dropped after this duration + TTL() time.Duration + SetTTL(time.Duration) + + // FirstAcquirer indicates + // that the recipient of the message is the first recipient to acquire + // the message, i.e. there have been no failed delivery attempts to + // other acquirers. Note that this does not mean the message has not + // been delivered to, but not acquired, by other recipients. + FirstAcquirer() bool + SetFirstAcquirer(bool) + + // DeliveryCount tracks how many attempts have been made to + // delivery a message. + DeliveryCount() uint32 + SetDeliveryCount(uint32) + + // MessageId provides a unique identifier for a message. + // it can be an a string, an unsigned long, a uuid or a + // binary value. + MessageId() interface{} + SetMessageId(interface{}) + + UserId() string + SetUserId(string) + + Address() string + SetAddress(string) + + Subject() string + SetSubject(string) + + ReplyTo() string + SetReplyTo(string) + + // CorrelationId is set on correlated request and response messages. It can be + // an a string, an unsigned long, a uuid or a binary value. + CorrelationId() interface{} + SetCorrelationId(interface{}) + + ContentType() string + SetContentType(string) + + ContentEncoding() string + SetContentEncoding(string) + - // ExpiryTime indicates an absoulte time when the message may be dropped. ++ // ExpiryTime indicates an absolute time when the message may be dropped. + // A Zero time (i.e. t.isZero() == true) indicates a message never expires. + ExpiryTime() time.Time + SetExpiryTime(time.Time) + + CreationTime() time.Time + SetCreationTime(time.Time) + + GroupId() string + SetGroupId(string) + + GroupSequence() int32 + SetGroupSequence(int32) + + ReplyToGroupId() string + SetReplyToGroupId(string) + + // Property map set by the application to be carried with the message. + // Values must be simple types (not maps, lists or sequences) + ApplicationProperties() map[string]interface{} + SetApplicationProperties(map[string]interface{}) + + // Per-delivery annotations to provide delivery instructions. + // May be added or removed by intermediaries during delivery. + DeliveryAnnotations() map[AnnotationKey]interface{} + SetDeliveryAnnotations(map[AnnotationKey]interface{}) + + // Message annotations added as part of the bare message at creation, usually + // by an AMQP library. See ApplicationProperties() for adding application data. + MessageAnnotations() map[AnnotationKey]interface{} + SetMessageAnnotations(map[AnnotationKey]interface{}) + + // Inferred indicates how the message content + // is encoded into AMQP sections. If inferred is true then binary and + // list values in the body of the message will be encoded as AMQP DATA + // and AMQP SEQUENCE sections, respectively. If inferred is false, + // then all values in the body of the message will be encoded as AMQP + // VALUE sections regardless of their type. + Inferred() bool + SetInferred(bool) + + // Marshal a Go value into the message body. See amqp.Marshal() for details. + Marshal(interface{}) + + // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details. + Unmarshal(interface{}) + - // Body value resulting from the default unmarshalling of message body as interface{} ++ // Body value resulting from the default unmarshaling of message body as interface{} + Body() interface{} + + // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough + // the message is encoded into it, otherwise a new buffer is created. + // Returns the buffer containing the message. + Encode(buffer []byte) ([]byte, error) + + // Decode data into this message. Overwrites an existing message content. + Decode(buffer []byte) error + + // Clear the message contents. + Clear() + + // Copy the contents of another message to this one. + Copy(m Message) error + + // Deprecated: use DeliveryAnnotations() for a more type-safe interface + Instructions() map[string]interface{} + SetInstructions(v map[string]interface{}) + + // Deprecated: use MessageAnnotations() for a more type-safe interface + Annotations() map[string]interface{} + SetAnnotations(v map[string]interface{}) + + // Deprecated: use ApplicationProperties() for a more type-safe interface + Properties() map[string]interface{} + SetProperties(v map[string]interface{}) +} + +type message struct{ pn *C.pn_message_t } + +func freeMessage(m *message) { + C.pn_message_free(m.pn) + m.pn = nil +} + +// NewMessage creates a new message instance. +func NewMessage() Message { + m := &message{C.pn_message()} + runtime.SetFinalizer(m, freeMessage) + return m +} + +// NewMessageWith creates a message with value as the body. Equivalent to +// m := NewMessage(); m.Marshal(body) +func NewMessageWith(value interface{}) Message { + m := NewMessage() + m.Marshal(value) + return m +} + +func (m *message) Clear() { C.pn_message_clear(m.pn) } + +func (m *message) Copy(x Message) error { + if data, err := x.Encode(nil); err == nil { + return m.Decode(data) + } else { + return err + } +} + +// ==== message get functions + +func rewindGet(data *C.pn_data_t) (v interface{}) { + C.pn_data_rewind(data) + C.pn_data_next(data) + unmarshal(&v, data) + return v +} + +func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) } +func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) } +func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) } +func (m *message) TTL() time.Duration { + return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond +} +func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) } +func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) } +func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) } +func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) } +func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) } +func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) } +func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) } +func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) } +func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) } +func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) } + +func (m *message) ExpiryTime() time.Time { + return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn)))) +} +func (m *message) CreationTime() time.Time { + return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn))) +} +func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) } +func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) } +func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) } + +func getAnnotations(data *C.pn_data_t) (v map[AnnotationKey]interface{}) { - C.pn_data_rewind(data) - C.pn_data_next(data) - unmarshal(&v, data) ++ if C.pn_data_size(data) > 0 { ++ C.pn_data_rewind(data) ++ C.pn_data_next(data) ++ unmarshal(&v, data) ++ } + return v +} + +func (m *message) DeliveryAnnotations() map[AnnotationKey]interface{} { + return getAnnotations(C.pn_message_instructions(m.pn)) +} +func (m *message) MessageAnnotations() map[AnnotationKey]interface{} { + return getAnnotations(C.pn_message_annotations(m.pn)) +} + +func (m *message) ApplicationProperties() map[string]interface{} { + var v map[string]interface{} + data := C.pn_message_properties(m.pn) - C.pn_data_rewind(data) - C.pn_data_next(data) - unmarshal(&v, data) ++ if C.pn_data_size(data) > 0 { ++ C.pn_data_rewind(data) ++ C.pn_data_next(data) ++ unmarshal(&v, data) ++ } + return v +} + +// ==== message set methods + +func setData(v interface{}, data *C.pn_data_t) { + C.pn_data_clear(data) + marshal(v, data) +} + +func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(b)) } +func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) } +func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) } +func (m *message) SetTTL(d time.Duration) { + C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond)) +} +func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) } +func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) } +func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) } +func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) } +func (m *message) SetAddress(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address)) +} +func (m *message) SetSubject(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject)) +} +func (m *message) SetReplyTo(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to)) +} +func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) } +func (m *message) SetContentType(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type)) +} +func (m *message) SetContentEncoding(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding)) +} +func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) } +func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) } +func (m *message) SetGroupId(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id)) +} +func (m *message) SetGroupSequence(s int32) { + C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s)) +} +func (m *message) SetReplyToGroupId(s string) { + C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id)) +} + +func (m *message) SetDeliveryAnnotations(v map[AnnotationKey]interface{}) { + setData(v, C.pn_message_instructions(m.pn)) +} +func (m *message) SetMessageAnnotations(v map[AnnotationKey]interface{}) { + setData(v, C.pn_message_annotations(m.pn)) +} +func (m *message) SetApplicationProperties(v map[string]interface{}) { + setData(v, C.pn_message_properties(m.pn)) +} + - // Marshal/Unmarshal body - func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) } - func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) } - func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return } ++// Marshal body from v ++func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) } ++ ++// Unmarshal body to v, which must be a pointer to a value. See amqp.Unmarshal ++func (m *message) Unmarshal(v interface{}) { ++ data := C.pn_message_body(m.pn) ++ if C.pn_data_size(data) > 0 { ++ C.pn_data_rewind(data) ++ C.pn_data_next(data) ++ unmarshal(v, data) ++ } ++ return ++} ++ ++// Return the body value as an interface ++func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return } + +func (m *message) Decode(data []byte) error { + m.Clear() + if len(data) == 0 { + return fmt.Errorf("empty buffer for decode") + } + if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 { + return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn))) + } + return nil +} + +func DecodeMessage(data []byte) (m Message, err error) { + m = NewMessage() + err = m.Decode(data) + return +} + +func (m *message) Encode(buffer []byte) ([]byte, error) { + encode := func(buf []byte) ([]byte, error) { + len := cLen(buf) + result := C.pn_message_encode(m.pn, cPtr(buf), &len) + switch { + case result == C.PN_OVERFLOW: + return buf, overflow + case result < 0: + return buf, fmt.Errorf("cannot encode message: %s", PnErrorCode(result)) + default: + return buf[:len], nil + } + } + return encodeGrow(buffer, encode) +} + +// TODO aconway 2015-09-14: Multi-section messages. + +// TODO aconway 2016-09-09: Message.String() use inspect. + +// ==== Deprecated functions +func oldGetAnnotations(data *C.pn_data_t) (v map[string]interface{}) { - C.pn_data_rewind(data) - C.pn_data_next(data) - unmarshal(&v, data) ++ if C.pn_data_size(data) > 0 { ++ C.pn_data_rewind(data) ++ C.pn_data_next(data) ++ unmarshal(&v, data) ++ } + return v +} + +func (m *message) Instructions() map[string]interface{} { + return oldGetAnnotations(C.pn_message_instructions(m.pn)) +} +func (m *message) Annotations() map[string]interface{} { + return oldGetAnnotations(C.pn_message_annotations(m.pn)) +} +func (m *message) Properties() map[string]interface{} { + return oldGetAnnotations(C.pn_message_properties(m.pn)) +} + +// Convert old string-keyed annotations to an AnnotationKey map +func fixAnnotations(old map[string]interface{}) (annotations map[AnnotationKey]interface{}) { + annotations = make(map[AnnotationKey]interface{}) + for k, v := range old { + annotations[AnnotationKeyString(k)] = v + } + return +} + +func (m *message) SetInstructions(v map[string]interface{}) { + setData(fixAnnotations(v), C.pn_message_instructions(m.pn)) +} +func (m *message) SetAnnotations(v map[string]interface{}) { + setData(fixAnnotations(v), C.pn_message_annotations(m.pn)) +} +func (m *message) SetProperties(v map[string]interface{}) { + setData(fixAnnotations(v), C.pn_message_properties(m.pn)) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/amqp/message_test.go ---------------------------------------------------------------------- diff --cc amqp/message_test.go index bfebfa1,0000000..3585dd8 mode 100644,000000..100644 --- a/amqp/message_test.go +++ b/amqp/message_test.go @@@ -1,201 -1,0 +1,201 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +import ( + "testing" + "time" +) + +func roundTrip(m Message) error { + buffer, err := m.Encode(nil) + if err != nil { + return err + } + m2, err := DecodeMessage(buffer) + if err != nil { + return err + } + return checkEqual(m, m2) +} + +func TestDefaultMessage(t *testing.T) { + m := NewMessage() + // Check defaults + for _, data := range [][]interface{}{ + {m.Inferred(), false}, + {m.Durable(), false}, + {m.Priority(), uint8(4)}, + {m.TTL(), time.Duration(0)}, + {m.UserId(), ""}, + {m.Address(), ""}, + {m.Subject(), ""}, + {m.ReplyTo(), ""}, + {m.ContentType(), ""}, + {m.ContentEncoding(), ""}, + {m.GroupId(), ""}, + {m.GroupSequence(), int32(0)}, + {m.ReplyToGroupId(), ""}, + {m.MessageId(), nil}, + {m.CorrelationId(), nil}, - {m.DeliveryAnnotations(), map[AnnotationKey]interface{}{}}, - {m.MessageAnnotations(), map[AnnotationKey]interface{}{}}, - {m.ApplicationProperties(), map[string]interface{}{}}, ++ {m.DeliveryAnnotations(), map[AnnotationKey]interface{}(nil)}, ++ {m.MessageAnnotations(), map[AnnotationKey]interface{}(nil)}, ++ {m.ApplicationProperties(), map[string]interface{}(nil)}, + + // Deprecated - {m.Instructions(), map[string]interface{}{}}, - {m.Annotations(), map[string]interface{}{}}, - {m.Properties(), map[string]interface{}{}}, ++ {m.Instructions(), map[string]interface{}(nil)}, ++ {m.Annotations(), map[string]interface{}(nil)}, ++ {m.Properties(), map[string]interface{}(nil)}, + {m.Body(), nil}, + } { + if err := checkEqual(data[0], data[1]); err != nil { + t.Error(err) + } + } + if err := roundTrip(m); err != nil { + t.Error(err) + } +} + +func TestMessageRoundTrip(t *testing.T) { + m := NewMessage() + m.SetInferred(false) + m.SetDurable(true) + m.SetPriority(42) + m.SetTTL(0) + m.SetUserId("user") + m.SetAddress("address") + m.SetSubject("subject") + m.SetReplyTo("replyto") + m.SetContentType("content") + m.SetContentEncoding("encoding") + m.SetGroupId("group") + m.SetGroupSequence(42) + m.SetReplyToGroupId("replytogroup") + m.SetMessageId("id") + m.SetCorrelationId("correlation") + m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"}) + m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"}) + m.SetApplicationProperties(map[string]interface{}{"int": int32(32), "bool": true}) + m.Marshal("hello") + + for _, data := range [][]interface{}{ + {m.Inferred(), false}, + {m.Durable(), true}, + {m.Priority(), uint8(42)}, + {m.TTL(), time.Duration(0)}, + {m.UserId(), "user"}, + {m.Address(), "address"}, + {m.Subject(), "subject"}, + {m.ReplyTo(), "replyto"}, + {m.ContentType(), "content"}, + {m.ContentEncoding(), "encoding"}, + {m.GroupId(), "group"}, + {m.GroupSequence(), int32(42)}, + {m.ReplyToGroupId(), "replytogroup"}, + {m.MessageId(), "id"}, + {m.CorrelationId(), "correlation"}, + + {m.DeliveryAnnotations(), map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"}}, + {m.MessageAnnotations(), map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"}}, + {m.ApplicationProperties(), map[string]interface{}{"int": int32(32), "bool": true}}, + {m.Body(), "hello"}, + + // Deprecated + {m.Instructions(), map[string]interface{}{"instructions": "foo"}}, + {m.Annotations(), map[string]interface{}{"annotations": "bar"}}, + } { + if err := checkEqual(data[0], data[1]); err != nil { + t.Error(err) + } + } + if err := roundTrip(m); err != nil { + t.Error(err) + } +} + +func TestDeprecated(t *testing.T) { + m := NewMessage() + + m.SetInstructions(map[string]interface{}{"instructions": "foo"}) + m.SetAnnotations(map[string]interface{}{"annotations": "bar"}) + m.SetProperties(map[string]interface{}{"int": int32(32), "bool": true}) + + for _, data := range [][]interface{}{ + {m.DeliveryAnnotations(), map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"}}, + {m.MessageAnnotations(), map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"}}, + {m.ApplicationProperties(), map[string]interface{}{"int": int32(32), "bool": true}}, + + {m.Instructions(), map[string]interface{}{"instructions": "foo"}}, + {m.Annotations(), map[string]interface{}{"annotations": "bar"}}, + {m.Properties(), map[string]interface{}{"int": int32(32), "bool": true}}, + } { + if err := checkEqual(data[0], data[1]); err != nil { + t.Error(err) + } + } + if err := roundTrip(m); err != nil { + t.Error(err) + } +} + +func TestMessageBodyTypes(t *testing.T) { + var s string + var body interface{} + var i int64 + + m := NewMessageWith(int64(42)) + m.Unmarshal(&body) + m.Unmarshal(&i) + if err := checkEqual(body.(int64), int64(42)); err != nil { + t.Error(err) + } + if err := checkEqual(i, int64(42)); err != nil { + t.Error(err) + } + + m = NewMessageWith("hello") + m.Unmarshal(&s) + m.Unmarshal(&body) + if err := checkEqual(s, "hello"); err != nil { + t.Error(err) + } + if err := checkEqual(body.(string), "hello"); err != nil { + t.Error(err) + } + if err := roundTrip(m); err != nil { + t.Error(err) + } + + m = NewMessageWith(Binary("bin")) + m.Unmarshal(&s) + m.Unmarshal(&body) + if err := checkEqual(body.(Binary), Binary("bin")); err != nil { + t.Error(err) + } + if err := checkEqual(s, "bin"); err != nil { + t.Error(err) + } + if err := roundTrip(m); err != nil { + t.Error(err) + } + + // TODO aconway 2015-09-08: array etc. +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/amqp/types.go ---------------------------------------------------------------------- diff --cc amqp/types.go index 9d41de6,0000000..a1fe2ac mode 100644,000000..100644 --- a/amqp/types.go +++ b/amqp/types.go @@@ -1,221 -1,0 +1,250 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +import "C" + +import ( + "bytes" + "fmt" - "reflect" + "time" + "unsafe" +) + +func (t C.pn_type_t) String() string { + switch C.pn_type_t(t) { + case C.PN_NULL: + return "null" + case C.PN_BOOL: + return "bool" + case C.PN_UBYTE: + return "ubyte" + case C.PN_BYTE: + return "byte" + case C.PN_USHORT: + return "ushort" + case C.PN_SHORT: + return "short" + case C.PN_CHAR: + return "char" + case C.PN_UINT: + return "uint" + case C.PN_INT: + return "int" + case C.PN_ULONG: + return "ulong" + case C.PN_LONG: + return "long" + case C.PN_TIMESTAMP: + return "timestamp" + case C.PN_FLOAT: + return "float" + case C.PN_DOUBLE: + return "double" + case C.PN_DECIMAL32: + return "decimal32" + case C.PN_DECIMAL64: + return "decimal64" + case C.PN_DECIMAL128: + return "decimal128" + case C.PN_UUID: + return "uuid" + case C.PN_BINARY: + return "binary" + case C.PN_STRING: + return "string" + case C.PN_SYMBOL: + return "symbol" + case C.PN_DESCRIBED: + return "described" + case C.PN_ARRAY: + return "array" + case C.PN_LIST: + return "list" + case C.PN_MAP: + return "map" + default: + return fmt.Sprintf("<bad-type %v>", int(t)) + } +} + - // Go types - var ( - bytesType = reflect.TypeOf([]byte{}) - valueType = reflect.TypeOf(reflect.Value{}) - ) - - // TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys. - - // Map is a generic map that can have mixed key and value types and so can represent any AMQP map ++// The AMQP map type. A generic map that can have mixed-type keys and values. +type Map map[interface{}]interface{} + - // List is a generic list that can hold mixed values and can represent any AMQP list. ++// The most general AMQP map type, for unusual interoperability cases. ++// ++// This is not a Go Map but a sequence of {key, value} pairs. +// ++// An AnyMap lets you control or examine the encoded ordering of key,value pairs ++// and use key values that are not legal as Go map keys. ++// ++// The amqp.Map, or plain Go map types are easier to use for most cases. ++type AnyMap []KeyValue ++ ++// Return a Map constructed from an AnyMap. ++// Panic if the AnyMap has key values that are not valid Go map keys (e.g. maps, slices) ++func (a AnyMap) Map() (m Map) { ++ for _, kv := range a { ++ m[kv.Key] = kv.Value ++ } ++ return ++} ++ ++// KeyValue pair, used by AnyMap ++type KeyValue struct{ Key, Value interface{} } ++ ++// The AMQP list type. A generic list that can hold mixed-type values. +type List []interface{} + ++// The generic AMQP array type, used to unmarshal an array with nested array, ++// map or list elements. Arrays of simple type T unmarshal to []T ++type Array []interface{} ++ +// Symbol is a string that is encoded as an AMQP symbol +type Symbol string + +func (s Symbol) String() string { return string(s) } +func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) } + +// Binary is a string that is encoded as an AMQP binary. +// It is a string rather than a byte[] because byte[] is not hashable and can't be used as +// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte +type Binary string + +func (b Binary) String() string { return string(b) } +func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) } + +// GoString for Map prints values with their types, useful for debugging. +func (m Map) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", m) + i := len(m) + for k, v := range m { + fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v) + i-- + if i > 0 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// GoString for List prints values with their types, useful for debugging. +func (l List) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", l) + for i := 0; i < len(l); i++ { + fmt.Fprintf(out, "%T(%#v)", l[i], l[i]) + if i == len(l)-1 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// pnTime converts Go time.Time to Proton millisecond Unix time. +func pnTime(t time.Time) C.pn_timestamp_t { + secs := t.Unix() - // Note: sub-second accuracy is not guaraunteed if the Unix time in ++ // Note: sub-second accuracy is not guaranteed if the Unix time in + // nanoseconds cannot be represented by an int64 (sometime around year 2260) + msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) + return C.pn_timestamp_t(secs*1000 + msecs) +} + +// goTime converts a pn_timestamp_t to a Go time.Time. +func goTime(t C.pn_timestamp_t) time.Time { + secs := int64(t) / 1000 + nsecs := (int64(t) % 1000) * int64(time.Millisecond) + return time.Unix(secs, nsecs) +} + +func goBytes(cBytes C.pn_bytes_t) (bytes []byte) { + if cBytes.start != nil { + bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size)) + } + return +} + +func goString(cBytes C.pn_bytes_t) (str string) { + if cBytes.start != nil { + str = C.GoStringN(cBytes.start, C.int(cBytes.size)) + } + return +} + +func pnBytes(b []byte) C.pn_bytes_t { + if len(b) == 0 { + return C.pn_bytes_t{0, nil} + } else { + return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))} + } +} + +func cPtr(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} + +func cLen(b []byte) C.size_t { + return C.size_t(len(b)) +} + +// AnnotationKey is used as a map key for AMQP annotation maps which are +// allowed to have keys that are either symbol or ulong but no other type. +// +type AnnotationKey struct { + value interface{} +} + +func AnnotationKeySymbol(v Symbol) AnnotationKey { return AnnotationKey{v} } +func AnnotationKeyUint64(v uint64) AnnotationKey { return AnnotationKey{v} } +func AnnotationKeyString(v string) AnnotationKey { return AnnotationKey{Symbol(v)} } + +// Returns the value which must be Symbol, uint64 or nil +func (k AnnotationKey) Get() interface{} { return k.value } + +func (k AnnotationKey) String() string { return fmt.Sprintf("%v", k.Get()) } + +// Described represents an AMQP described type, which is really +// just a pair of AMQP values - the first is treated as a "descriptor", +// and is normally a string or ulong providing information about the type. +// The second is the "value" and can be any AMQP value. +type Described struct { + Descriptor interface{} + Value interface{} +} ++ ++// UUID is an AMQP 128-bit Universally Unique Identifier, as defined by RFC-4122 section 4.1.2 ++type UUID [16]byte ++ ++func (u UUID) String() string { ++ return fmt.Sprintf("UUID(%x-%x-%x-%x-%x)", u[0:4], u[4:6], u[6:8], u[8:10], u[10:]) ++} ++ ++// Char is an AMQP unicode character, equivalent to a Go rune. ++// It is defined as a distinct type so it can be distinguished from an AMQP int ++type Char rune ++ ++const intIs64 = unsafe.Sizeof(int(0)) == 8 http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/amqp/types_test.go ---------------------------------------------------------------------- diff --cc amqp/types_test.go index 959a558,0000000..efa6e59 mode 100644,000000..100644 --- a/amqp/types_test.go +++ b/amqp/types_test.go @@@ -1,197 -1,0 +1,219 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +import ( + "fmt" + "reflect" + "testing" ++ "time" +) + +func checkEqual(want interface{}, got interface{}) error { + if !reflect.DeepEqual(want, got) { + return fmt.Errorf("%#v != %#v", want, got) + } + return nil +} + - func checkUnmarshal(marshalled []byte, v interface{}) error { - got, err := Unmarshal(marshalled, v) ++func checkUnmarshal(marshaled []byte, v interface{}) error { ++ got, err := Unmarshal(marshaled, v) + if err != nil { + return err + } - if got != len(marshalled) { - return fmt.Errorf("Wanted to Unmarshal %v bytes, got %v", len(marshalled), got) ++ if got != len(marshaled) { ++ return fmt.Errorf("Wanted to Unmarshal %v bytes, got %v", len(marshaled), got) + } + return nil +} + +func ExampleKey() { + var k AnnotationKey = AnnotationKeySymbol(Symbol("foo")) + fmt.Println(k.Get().(Symbol)) + k = AnnotationKeyUint64(42) + fmt.Println(k.Get().(uint64)) + // Output: + // foo + // 42 +} + ++var timeValue = time.Now().Round(time.Millisecond) ++ +// Values that are unchanged by a marshal/unmarshal round-trip from interface{} +// to interface{} +var rtValues = []interface{}{ + true, + int8(-8), int16(-16), int32(-32), int64(-64), + uint8(8), uint16(16), uint32(32), uint64(64), + float32(0.32), float64(0.64), + "string", Binary("Binary"), Symbol("symbol"), + nil, ++ Described{"D", "V"}, ++ timeValue, ++ UUID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, ++ Char('a'), + Map{"V": "X"}, ++ Map{}, + List{"V", int32(1)}, - Described{"D", "V"}, ++ List{}, ++ []string{"a", "b", "c"}, ++ []int8{}, ++ AnyMap{{[]int8{1, 2, 3}, "bad-key"}, {int16(1), "duplicate-1"}, {int16(1), "duplicate-2"}}, +} + - // Go values that unmarshal as an equivalent value but a different type - // if unmarshalled to interface{}. ++// Go values that round-trip if unmarshalled back to the same type they were ++// marshalled from, but unmarshal to interface{} as a different default type. +var oddValues = []interface{}{ - int(-99), uint(99), // [u]int32|64 ++ int(-99), // int32|64 depending on platform ++ uint(99), // int32|64 depending on platform + []byte("byte"), // amqp.Binary + map[string]int{"str": 99}, // amqp.Map - []string{"a", "b"}, // amqp.List ++ []Map{Map{}}, // amqp.Array - the generic array ++ AnyMap(nil), // Map +} + +var allValues = append(rtValues, oddValues...) + +// %v formatted representation of allValues +var vstrings = []string{ + // for rtValues + "true", + "-8", "-16", "-32", "-64", + "8", "16", "32", "64", + "0.32", "0.64", + "string", "Binary", "symbol", + "<nil>", ++ "{D V}", ++ fmt.Sprintf("%v", timeValue), ++ "UUID(01020304-0506-0708-090a-0b0c0d0e0f10)", ++ fmt.Sprintf("%v", 'a'), + "map[V:X]", ++ "map[]", + "[V 1]", - "{D V}", ++ "[]", ++ "[a b c]", ++ "[]", ++ "[{[1 2 3] bad-key} {1 duplicate-1} {1 duplicate-2}]", + // for oddValues + "-99", "99", + "[98 121 116 101]", /*"byte"*/ + "map[str:99]", - "[a b]", ++ "[map[]]", ++ "[]", +} + +// Round-trip encoding test +func TestTypesRoundTrip(t *testing.T) { + for _, x := range rtValues { + marshalled, err := Marshal(x, nil) + if err != nil { + t.Error(err) + } + var v interface{} + if err := checkUnmarshal(marshalled, &v); err != nil { + t.Error(err) + } - if err := checkEqual(v, x); err != nil { - t.Error(t, err) ++ if err := checkEqual(x, v); err != nil { ++ t.Error(err) + } + } +} + +// Round trip from T to T where T is the type of the value. +func TestTypesRoundTripAll(t *testing.T) { + for _, x := range allValues { - marshalled, err := Marshal(x, nil) ++ marshaled, err := Marshal(x, nil) + if err != nil { + t.Error(err) + } + if x == nil { // We can't create an instance of nil to unmarshal to. + continue + } + vp := reflect.New(reflect.TypeOf(x)) // v points to a Zero of the same type as x - if err := checkUnmarshal(marshalled, vp.Interface()); err != nil { ++ if err := checkUnmarshal(marshaled, vp.Interface()); err != nil { + t.Error(err) + } + v := vp.Elem().Interface() - if err := checkEqual(v, x); err != nil { ++ if err := checkEqual(x, v); err != nil { + t.Error(err) + } + } +} + +func TestTypesPrint(t *testing.T) { + // Default %v representations of rtValues and oddValues + for i, x := range allValues { + if s := fmt.Sprintf("%v", x); vstrings[i] != s { + t.Errorf("printing %T: want %v, got %v", x, vstrings[i], s) + } + } +} + +func TestDescribed(t *testing.T) { + want := Described{"D", "V"} - marshalled, _ := Marshal(want, nil) ++ marshaled, _ := Marshal(want, nil) + + // Unmarshal to Described type + var d Described - if err := checkUnmarshal(marshalled, &d); err != nil { ++ if err := checkUnmarshal(marshaled, &d); err != nil { + t.Error(err) + } + if err := checkEqual(want, d); err != nil { + t.Error(err) + } + + // Unmarshal to interface{} + var i interface{} - if err := checkUnmarshal(marshalled, &i); err != nil { ++ if err := checkUnmarshal(marshaled, &i); err != nil { + t.Error(err) + } + if _, ok := i.(Described); !ok { + t.Errorf("Expected Described, got %T(%v)", i, i) + } + if err := checkEqual(want, i); err != nil { + t.Error(err) + } + + // Unmarshal value only (drop descriptor) to the value type + var s string - if err := checkUnmarshal(marshalled, &s); err != nil { ++ if err := checkUnmarshal(marshaled, &s); err != nil { + t.Error(err) + } + if err := checkEqual(want.Value, s); err != nil { + t.Error(err) + } + + // Nested described types + want = Described{Described{int64(123), true}, "foo"} - marshalled, _ = Marshal(want, nil) - if err := checkUnmarshal(marshalled, &d); err != nil { ++ marshaled, _ = Marshal(want, nil) ++ if err := checkUnmarshal(marshaled, &d); err != nil { + t.Error(err) + } + if err := checkEqual(want, d); err != nil { + t.Error(err) + } + // Nested to interface - if err := checkUnmarshal(marshalled, &i); err != nil { ++ if err := checkUnmarshal(marshaled, &i); err != nil { + t.Error(err) + } + if err := checkEqual(want, i); err != nil { + t.Error(err) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
