Merge branch 'master' into go1 - 0.11 alpa
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8dc93cd0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8dc93cd0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8dc93cd0 Branch: refs/heads/go1 Commit: 8dc93cd08c786b30fc1e76312e6761739a791d5a Parents: 581841f f70afb6 Author: Alan Conway <acon...@redhat.com> Authored: Fri Oct 23 10:22:12 2015 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Fri Oct 23 10:22:12 2015 -0400 ---------------------------------------------------------------------- README.md | 96 +++++ amqp/doc.go | 34 ++ amqp/error.go | 66 +++ amqp/interop | 1 + amqp/interop_test.go | 381 +++++++++++++++++ amqp/marshal.go | 250 +++++++++++ amqp/message.go | 347 +++++++++++++++ amqp/message_test.go | 166 ++++++++ amqp/types.go | 199 +++++++++ amqp/unmarshal.go | 558 ++++++++++++++++++++++++ amqp/url.go | 96 +++++ amqp/url_test.go | 51 +++ electron/connection.go | 218 ++++++++++ electron/container.go | 71 ++++ electron/doc.go | 57 +++ electron/endpoint.go | 68 +++ electron/handler.go | 158 +++++++ electron/link.go | 247 +++++++++++ electron/messaging_test.go | 416 ++++++++++++++++++ electron/receiver.go | 238 +++++++++++ electron/sender.go | 315 ++++++++++++++ electron/session.go | 125 ++++++ electron/time.go | 82 ++++ internal/error.go | 118 +++++ internal/flexchannel.go | 82 ++++ internal/flexchannel_test.go | 89 ++++ internal/safemap.go | 57 +++ internal/uuid.go | 70 +++ proton/doc.go | 51 ++- proton/engine.go | 402 ++++++++++++++++++ proton/handlers.go | 391 +++++++++++++++++ proton/message.go | 86 ++++ proton/proton_test.go | 27 ++ proton/wrappers.go | 384 +++++++++++++++++ proton/wrappers_gen.go | 874 ++++++++++++++++++++++++++++++++++++++ readme-branch.md | 7 + 36 files changed, 6871 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/README.md ---------------------------------------------------------------------- diff --cc README.md index 0000000,9f95939..4b2da12 mode 000000,100644..100644 --- a/README.md +++ b/README.md @@@ -1,0 -1,44 +1,96 @@@ -Qpid Proton - AMQP messaging toolkit -==================================== - -Linux Build | Windows Build -------------|-------------- -[](https://travis-ci.org/apache/qpid-proton) | [](https://ci.appveyor.com/project/ke4qqq/qpid-proton/branch/master) - -Qpid Proton is a high-performance, lightweight messaging library. It can be -used in the widest range of messaging applications, including brokers, client -libraries, routers, bridges, proxies, and more. Proton makes it trivial to -integrate with the AMQP 1.0 ecosystem from any platform, environment, or -language - -Features --------- - - + A flexible and capable reactive messaging API - + Full control of AMQP 1.0 protocol semantics - + Portable C implementation with bindings to popular languages - + Pure-Java and pure-JavaScript implementations - + Peer-to-peer and brokered messaging - + Secure communication via SSL and SASL - -Universal - Proton is designed to scale both up and down. Equally suitable for -simple clients or high-powered servers, it can be deployed in simple -peer-to-peer configurations or as part of a global federated messaging network. - -Embeddable - Proton is carefully written to be portable and cross platform. It -has minimal dependencies, and it is architected to be usable with any threading -model, as well as with non-threaded applications. These features make it -uniquely suited for embedding messaging capabilities into existing software. - -Standard - Built around the AMQP 1.0 messaging standard, Proton is not only -ideal for building out your own messaging applications but also for connecting -them to the broader ecosystem of AMQP 1.0-based messaging applications. - -Getting Started ---------------- - -See the included INSTALL file for build and install instructions and the -DEVELOPERS file for information on how to modify and test the library code -itself. - -Please see http://qpid.apache.org/proton for a more info. ++# Qpid Go packages for AMQP ++ ++These packages provide [Go](http://golang.org) support for sending and receiving ++AMQP messages in client or server applications. Reference documentation is ++available at: <http://godoc.org/?q=qpid.apache.org> ++ ++There are 3 packages: ++ ++[qpid.apache.org/amqp](http://godoc.org/qpid.apache.org/amqp) provides functions ++to convert AMQP messages and data types to and from Go data types. Used by both ++the proton and electron packages to manage AMQP data. ++ ++[qpid.apache.org/electron](http://godoc.org/qpid.apache.org/electron) is a ++simple, concurrent-safe API for sending and receiving messages. It can be used ++with goroutines and channels to build concurrent AMQP clients and servers. ++ ++[qpid.apache.org/proton](http://godoc.org/qpid.apache.org/proton) is an ++event-driven, concurrent-unsafe package that closely follows the proton C ++API. Most Go programmers will find the electron package easier to use. ++ ++There are [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md) ++to help you get started. ++ ++Feedback is encouraged at: ++ ++- Email <pro...@qpid.apache.org> ++- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue. ++ ++### Why two APIs? ++ ++The `proton` API is a direct mapping of the proton C library into Go. It is ++usable but not very natural for a Go programmer because it takes an ++*event-driven* approach and has no built-in support for concurrent ++use. `electron` uses `proton` internally but provides a more Go-like API that is ++safe to use from multiple concurrent goroutines. ++ ++Go encourages programs to be structured as concurrent *goroutines* that ++communicate via *channels*. Go literature distinguishes between: ++ ++- *concurrency*: "keeping track of things that could be done in parallel" ++- *parallelism*: "actually doing things in parallel on multiple CPUs or cores" ++ ++A Go program expresses concurrency by starting goroutines for potentially ++concurrent tasks. The Go runtime schedules the activity of goroutines onto a ++small number (possibly one) of actual parallel executions. ++ ++Even with no hardware parallelism, goroutine concurrency lets the Go runtime ++order unpredictable events like file descriptors being readable/writable, ++channels having data, timers firing etc. Go automatically takes care of ++switching out goroutines that block or sleep so it is normal to write code in ++terms of blocking calls. ++ ++By contrast, event-driven programming is based on polling mechanisms like ++`select`, `poll` or `epoll`. These also dispatch unpredictably ordered events to ++a single thread or a small thread pool. However this requires a different style ++of programming: "event-driven" or "reactive" programming. Go developers call it ++"inside-out" programming. In an event-driven program blocking is a big problem ++as it consumes a scarce thread of execution, so actions that take time to ++complete have to be re-structured in terms of multiple events. ++ ++The promise of Go is that you can express your program in concurrent, sequential ++terms and the Go runtime will turn it inside-out for you. You can start ++goroutines for all concurrent activities. They can loop forever or block for as ++long as they need waiting for timers, IO or any unpredictable event. Go will ++interleave and schedule them efficiently onto the available parallel hardware. ++ ++For example: in the `electron` API, you can send a message and wait for it to be ++acknowledged in a single function. All the information about the message, why ++you sent it, and what to do when it is acknowledged can be held in local ++variables, all the code is in a simple sequence. Other goroutines in your ++program can be sending and receiving messages concurrently, they are not ++blocked. ++ ++In the `proton` API, an event handler that sends a message must return ++*immediately*, it cannot block the event loop to wait for ++acknowledgement. Acknowledgement is a separate event, so the code for handling ++it is in a different event handler. Context information about the message has to ++be stored in some non-local variable that both functions can find. This makes ++the code harder to follow. ++ ++The `proton` API is important because it is the foundation for the `electron` ++API, and may be useful for programs that need to be close to the original C ++library for some reason. However the `electron` API hides the event-driven ++details behind simple, sequential, concurrent-safe methods that can be called ++from arbitrary goroutines. Under the covers, data is passed through channels to ++dedicated `proton` goroutines so user goroutines can work concurrently with the ++proton event-loop. ++ ++## New to Go? ++ ++If you are new to Go then these are a good place to start: ++ ++- [A Tour of Go](http://tour.golang.org) ++- [Effective Go](http://golang.org/doc/effective_go.html) ++ ++Then look at the tools and docs at <http://golang.org> as you need them. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/doc.go ---------------------------------------------------------------------- diff --cc amqp/doc.go index 0000000,0000000..323c344 new file mode 100644 --- /dev/null +++ b/amqp/doc.go @@@ -1,0 -1,0 +1,34 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++or more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++/* ++Package amqp encodes and decodes AMQP messages and data types as Go types. ++ ++It follows the standard 'encoding' libraries pattern. The mapping between AMQP ++and Go types is described in the documentation of the Marshal and Unmarshal ++functions. ++ ++AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/> ++*/ ++package amqp ++ ++// #cgo LDFLAGS: -lqpid-proton ++import "C" ++ ++// This file is just for the package comment. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/error.go ---------------------------------------------------------------------- diff --cc amqp/error.go index 0000000,0000000..868dbf3 new file mode 100644 --- /dev/null +++ b/amqp/error.go @@@ -1,0 -1,0 +1,66 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++or more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++package amqp ++ ++import ( ++ "fmt" ++ "reflect" ++) ++ ++// Error is an AMQP error condition. It has a name and a description. ++// It implements the Go error interface so can be returned as an error value. ++// ++// You can pass amqp.Error to methods that pass an error to a remote endpoint, ++// this gives you full control over what the remote endpoint will see. ++// ++// You can also pass any Go error to such functions, the remote peer ++// will see the equivalent of MakeError(error) ++// ++type Error struct{ Name, Description string } ++ ++// Error implements the Go error interface for AMQP error errors. ++func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) } ++ ++// Errorf makes a Error with name and formatted description as per fmt.Sprintf ++func Errorf(name, format string, arg ...interface{}) Error { ++ return Error{name, fmt.Sprintf(format, arg...)} ++} ++ ++// MakeError makes an AMQP error from a go error using the Go error type as the name ++// and the err.Error() string as the description. ++func MakeError(err error) Error { ++ return Error{reflect.TypeOf(err).Name(), err.Error()} ++} ++ ++var ( ++ InternalError = "amqp:internal-error" ++ NotFound = "amqp:not-found" ++ UnauthorizedAccess = "amqp:unauthorized-access" ++ DecodeError = "amqp:decode-error" ++ ResourceLimit = "amqp:resource-limit" ++ NotAllowed = "amqp:not-allowed" ++ InvalidField = "amqp:invalid-field" ++ NotImplemented = "amqp:not-implemented" ++ ResourceLocked = "amqp:resource-locked" ++ PreerrorFailed = "amqp:preerror-failed" ++ ResourceDeleted = "amqp:resource-deleted" ++ IllegalState = "amqp:illegal-state" ++ FrameSizeTooSmall = "amqp:frame-size-too-small" ++) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/interop ---------------------------------------------------------------------- diff --cc amqp/interop index 0000000,0000000..ad6fcad new file mode 120000 --- /dev/null +++ b/amqp/interop @@@ -1,0 -1,0 +1,1 @@@ ++../../../../../../tests/interop http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/interop_test.go ---------------------------------------------------------------------- diff --cc amqp/interop_test.go index 0000000,0000000..b36ef64 new file mode 100644 --- /dev/null +++ b/amqp/interop_test.go @@@ -1,0 -1,0 +1,381 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++or more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++// Test that conversion of Go type to/from AMQP is compatible with other ++// bindings. ++// ++package amqp ++ ++import ( ++ "bytes" ++ "fmt" ++ "io" ++ "io/ioutil" ++ "os" ++ "reflect" ++ "strings" ++ "testing" ++) ++ ++func checkEqual(want interface{}, got interface{}) error { ++ if !reflect.DeepEqual(want, got) { ++ return fmt.Errorf("%#v != %#v", want, got) ++ } ++ return nil ++} ++ ++func getReader(name string) (r io.Reader) { ++ r, err := os.Open("interop/" + name + ".amqp") ++ if err != nil { ++ panic(fmt.Errorf("Can't open %#v: %v", name, err)) ++ } ++ return ++} ++ ++func remaining(d *Decoder) string { ++ remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader)) ++ return string(remainder) ++} ++ ++// checkDecode: want is the expected value, gotPtr is a pointer to a ++// instance of the same type for Decode. ++func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) { ++ ++ if err := d.Decode(gotPtr); err != nil { ++ t.Error("Decode failed", err) ++ return ++ } ++ got := reflect.ValueOf(gotPtr).Elem().Interface() ++ if err := checkEqual(want, got); err != nil { ++ t.Error("Decode bad value:", err) ++ return ++ } ++ ++ // Try round trip encoding ++ bytes, err := Marshal(want, nil) ++ if err != nil { ++ t.Error("Marshal failed", err) ++ return ++ } ++ n, err := Unmarshal(bytes, gotPtr) ++ if err != nil { ++ t.Error("Unmarshal failed", err) ++ return ++ } ++ if err := checkEqual(n, len(bytes)); err != nil { ++ t.Error("Bad unmarshal length", err) ++ return ++ } ++ got = reflect.ValueOf(gotPtr).Elem().Interface() ++ if err = checkEqual(want, got); err != nil { ++ t.Error("Bad unmarshal value", err) ++ return ++ } ++} ++ ++func TestUnmarshal(t *testing.T) { ++ bytes, err := ioutil.ReadAll(getReader("strings")) ++ if err != nil { ++ t.Error(err) ++ } ++ for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { ++ var got string ++ n, err := Unmarshal(bytes, &got) ++ if err != nil { ++ t.Error(err) ++ } ++ if want != got { ++ t.Errorf("%#v != %#v", want, got) ++ } ++ bytes = bytes[n:] ++ } ++} ++ ++func TestPrimitivesExact(t *testing.T) { ++ d := NewDecoder(getReader("primitives")) ++ // Decoding into exact types ++ var b bool ++ checkDecode(d, true, &b, t) ++ checkDecode(d, false, &b, t) ++ var u8 uint8 ++ checkDecode(d, uint8(42), &u8, t) ++ var u16 uint16 ++ checkDecode(d, uint16(42), &u16, t) ++ var i16 int16 ++ checkDecode(d, int16(-42), &i16, t) ++ var u32 uint32 ++ checkDecode(d, uint32(12345), &u32, t) ++ var i32 int32 ++ checkDecode(d, int32(-12345), &i32, t) ++ var u64 uint64 ++ checkDecode(d, uint64(12345), &u64, t) ++ var i64 int64 ++ checkDecode(d, int64(-12345), &i64, t) ++ var f32 float32 ++ checkDecode(d, float32(0.125), &f32, t) ++ var f64 float64 ++ checkDecode(d, float64(0.125), &f64, t) ++} ++ ++func TestPrimitivesCompatible(t *testing.T) { ++ d := NewDecoder(getReader("primitives")) ++ // Decoding into compatible types ++ var b bool ++ var i int ++ var u uint ++ var f float64 ++ checkDecode(d, true, &b, t) ++ checkDecode(d, false, &b, t) ++ checkDecode(d, uint(42), &u, t) ++ checkDecode(d, uint(42), &u, t) ++ checkDecode(d, -42, &i, t) ++ checkDecode(d, uint(12345), &u, t) ++ checkDecode(d, -12345, &i, t) ++ checkDecode(d, uint(12345), &u, t) ++ checkDecode(d, -12345, &i, t) ++ checkDecode(d, 0.125, &f, t) ++ checkDecode(d, 0.125, &f, t) ++} ++ ++// checkDecodeValue: want is the expected value, decode into a reflect.Value ++func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) { ++ ++ var got, got2 interface{} ++ if err := d.Decode(&got); err != nil { ++ t.Error("Decode failed", err) ++ return ++ } ++ if err := checkEqual(want, got); err != nil { ++ t.Error(err) ++ return ++ } ++ // Try round trip encoding ++ bytes, err := Marshal(got, nil) ++ if err != nil { ++ t.Error(err) ++ return ++ } ++ n, err := Unmarshal(bytes, &got2) ++ if err != nil { ++ t.Error(err) ++ return ++ } ++ if err := checkEqual(n, len(bytes)); err != nil { ++ t.Error(err) ++ return ++ } ++ if err := checkEqual(want, got2); err != nil { ++ t.Error(err) ++ return ++ } ++} ++ ++func TestPrimitivesInterface(t *testing.T) { ++ d := NewDecoder(getReader("primitives")) ++ checkDecodeInterface(d, true, t) ++ checkDecodeInterface(d, false, t) ++ checkDecodeInterface(d, uint8(42), t) ++ checkDecodeInterface(d, uint16(42), t) ++ checkDecodeInterface(d, int16(-42), t) ++ checkDecodeInterface(d, uint32(12345), t) ++ checkDecodeInterface(d, int32(-12345), t) ++ checkDecodeInterface(d, uint64(12345), t) ++ checkDecodeInterface(d, int64(-12345), t) ++ checkDecodeInterface(d, float32(0.125), t) ++ checkDecodeInterface(d, float64(0.125), t) ++} ++ ++func TestStrings(t *testing.T) { ++ d := NewDecoder(getReader("strings")) ++ // Test decoding as plain Go strings ++ for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { ++ var got string ++ checkDecode(d, want, &got, t) ++ } ++ remains := remaining(d) ++ if remains != "" { ++ t.Errorf("leftover: %s", remains) ++ } ++ ++ // Test decoding as specific string types ++ d = NewDecoder(getReader("strings")) ++ var bytes []byte ++ var str, sym string ++ checkDecode(d, []byte("abc\000defg"), &bytes, t) ++ checkDecode(d, "abcdefg", &str, t) ++ checkDecode(d, "abcdefg", &sym, t) ++ checkDecode(d, make([]byte, 0), &bytes, t) ++ checkDecode(d, "", &str, t) ++ checkDecode(d, "", &sym, t) ++ remains = remaining(d) ++ if remains != "" { ++ t.Fatalf("leftover: %s", remains) ++ } ++ ++ // Test some error handling ++ d = NewDecoder(getReader("strings")) ++ var s string ++ err := d.Decode(s) ++ if err == nil { ++ t.Fatal("Expected error") ++ } ++ if !strings.Contains(err.Error(), "not a pointer") { ++ t.Error(err) ++ } ++ var i int ++ err = d.Decode(&i) ++ if !strings.Contains(err.Error(), "cannot unmarshal") { ++ t.Error(err) ++ } ++ _, err = Unmarshal([]byte{}, nil) ++ if !strings.Contains(err.Error(), "not enough data") { ++ t.Error(err) ++ } ++ _, err = Unmarshal([]byte("foobar"), nil) ++ if !strings.Contains(err.Error(), "invalid-argument") { ++ t.Error(err) ++ } ++} ++ ++func TestEncodeDecode(t *testing.T) { ++ type data struct { ++ s string ++ i int ++ u8 uint8 ++ b bool ++ f float32 ++ v interface{} ++ } ++ ++ in := data{"foo", 42, 9, true, 1.234, "thing"} ++ ++ buf := bytes.Buffer{} ++ e := NewEncoder(&buf) ++ if err := e.Encode(in.s); err != nil { ++ t.Error(err) ++ } ++ if err := e.Encode(in.i); err != nil { ++ t.Error(err) ++ } ++ if err := e.Encode(in.u8); err != nil { ++ t.Error(err) ++ } ++ if err := e.Encode(in.b); err != nil { ++ t.Error(err) ++ } ++ if err := e.Encode(in.f); err != nil { ++ t.Error(err) ++ } ++ if err := e.Encode(in.v); err != nil { ++ t.Error(err) ++ } ++ ++ var out data ++ d := NewDecoder(&buf) ++ if err := d.Decode(&out.s); err != nil { ++ t.Error(err) ++ } ++ if err := d.Decode(&out.i); err != nil { ++ t.Error(err) ++ } ++ if err := d.Decode(&out.u8); err != nil { ++ t.Error(err) ++ } ++ if err := d.Decode(&out.b); err != nil { ++ t.Error(err) ++ } ++ if err := d.Decode(&out.f); err != nil { ++ t.Error(err) ++ } ++ if err := d.Decode(&out.v); err != nil { ++ t.Error(err) ++ } ++ ++ if err := checkEqual(in, out); err != nil { ++ t.Error(err) ++ } ++} ++ ++func TestMap(t *testing.T) { ++ d := NewDecoder(getReader("maps")) ++ ++ // Generic map ++ var m Map ++ checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t) ++ ++ // Interface as map ++ var i interface{} ++ checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t) ++ ++ d = NewDecoder(getReader("maps")) ++ // Specific typed map ++ var m2 map[string]int ++ checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t) ++ ++ // Nested map ++ m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}} ++ bytes, err := Marshal(m, nil) ++ if err != nil { ++ t.Fatal(err) ++ } ++ _, err = Unmarshal(bytes, &i) ++ if err != nil { ++ t.Fatal(err) ++ } ++ if err = checkEqual(m, i); err != nil { ++ t.Fatal(err) ++ } ++} ++ ++func TestList(t *testing.T) { ++ d := NewDecoder(getReader("lists")) ++ var l List ++ checkDecode(d, List{int32(32), "foo", true}, &l, t) ++ checkDecode(d, List{}, &l, t) ++} ++ ++// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as ++// as an AMQP string *inside* an AMQP binary?? Skip the test for now. ++func TODO_TestMessage(t *testing.T) { ++ bytes, err := ioutil.ReadAll(getReader("message")) ++ if err != nil { ++ t.Fatal(err) ++ } ++ ++ m, err := DecodeMessage(bytes) ++ if err != nil { ++ t.Fatal(err) ++ } else { ++ if err := checkEqual(m.Body(), "hello"); err != nil { ++ t.Error(err) ++ } ++ } ++ ++ m2 := NewMessageWith("hello") ++ bytes2, err := m2.Encode(nil) ++ if err != nil { ++ t.Error(err) ++ } else { ++ if err = checkEqual(bytes, bytes2); err != nil { ++ t.Error(err) ++ } ++ } ++} ++ ++// TODO aconway 2015-03-13: finish the full interop test http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/marshal.go ---------------------------------------------------------------------- diff --cc amqp/marshal.go index 0000000,0000000..666b4f6 new file mode 100644 --- /dev/null +++ b/amqp/marshal.go @@@ -1,0 -1,0 +1,250 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++or more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++package amqp ++ ++// #include <proton/codec.h> ++import "C" ++ ++import ( ++ "io" ++ "qpid.apache.org/internal" ++ "reflect" ++ "unsafe" ++) ++ ++func dataError(prefix string, data *C.pn_data_t) error { ++ err := internal.PnError(unsafe.Pointer(C.pn_data_error(data))) ++ if err != nil { ++ err = internal.Errorf("%s: %s", prefix, err.(internal.Error)) ++ } ++ return err ++} ++ ++/* ++Marshal encodes a Go value as AMQP data in buffer. ++If buffer is nil, or is not large enough, a new buffer is created. ++ ++Returns the buffer used for encoding with len() adjusted to the actual size of data. ++ ++Go types are encoded as follows ++ ++ +-------------------------------------+--------------------------------------------+ ++ |Go type |AMQP type | ++ +-------------------------------------+--------------------------------------------+ ++ |bool |bool | ++ +-------------------------------------+--------------------------------------------+ ++ |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) | ++ +-------------------------------------+--------------------------------------------+ ++ |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) | ++ +-------------------------------------+--------------------------------------------+ ++ |float32, float64 |float, double. | ++ +-------------------------------------+--------------------------------------------+ ++ |string |string | ++ +-------------------------------------+--------------------------------------------+ ++ |[]byte, Binary |binary | ++ +-------------------------------------+--------------------------------------------+ ++ |Symbol |symbol | ++ +-------------------------------------+--------------------------------------------+ ++ |interface{} |the contained type | ++ +-------------------------------------+--------------------------------------------+ ++ |nil |null | ++ +-------------------------------------+--------------------------------------------+ ++ |map[K]T |map with K and T converted as above | ++ +-------------------------------------+--------------------------------------------+ ++ |Map |map, may have mixed types for keys, values | ++ +-------------------------------------+--------------------------------------------+ ++ |[]T |list with T converted as above | ++ +-------------------------------------+--------------------------------------------+ ++ |List |list, may have mixed types values | ++ +-------------------------------------+--------------------------------------------+ ++ ++The following Go types cannot be marshaled: uintptr, function, interface, channel ++ ++TODO ++ ++Go types: array, slice, struct, complex64/128. ++ ++AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies. ++ ++Described types. ++ ++*/ ++func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { ++ defer doRecover(&err) ++ data := C.pn_data(0) ++ defer C.pn_data_free(data) ++ marshal(v, data) ++ encode := func(buf []byte) ([]byte, error) { ++ n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf))) ++ switch { ++ case n == int(C.PN_OVERFLOW): ++ return buf, overflow ++ case n < 0: ++ return buf, dataError("marshal error", data) ++ default: ++ return buf[:n], nil ++ } ++ } ++ return encodeGrow(buffer, encode) ++} ++ ++const minEncode = 256 ++ ++// overflow is returned when an encoding function can't fit data in the buffer. ++var overflow = internal.Errorf("buffer too small") ++ ++// encodeFn encodes into buffer[0:len(buffer)]. ++// Returns buffer with length adjusted for data encoded. ++// If buffer too small, returns overflow as error. ++type encodeFn func(buffer []byte) ([]byte, error) ++ ++// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer. ++// Returns the final buffer. ++func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) { ++ if buffer == nil || len(buffer) == 0 { ++ buffer = make([]byte, minEncode) ++ } ++ var err error ++ for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) { ++ buffer = make([]byte, 2*len(buffer)) ++ } ++ return buffer, err ++} ++ ++func marshal(v interface{}, data *C.pn_data_t) { ++ switch v := v.(type) { ++ case nil: ++ C.pn_data_put_null(data) ++ case bool: ++ C.pn_data_put_bool(data, C.bool(v)) ++ case int8: ++ C.pn_data_put_byte(data, C.int8_t(v)) ++ case int16: ++ C.pn_data_put_short(data, C.int16_t(v)) ++ case int32: ++ C.pn_data_put_int(data, C.int32_t(v)) ++ case int64: ++ C.pn_data_put_long(data, C.int64_t(v)) ++ case int: ++ if unsafe.Sizeof(0) == 8 { ++ C.pn_data_put_long(data, C.int64_t(v)) ++ } else { ++ C.pn_data_put_int(data, C.int32_t(v)) ++ } ++ case uint8: ++ C.pn_data_put_ubyte(data, C.uint8_t(v)) ++ case uint16: ++ C.pn_data_put_ushort(data, C.uint16_t(v)) ++ case uint32: ++ C.pn_data_put_uint(data, C.uint32_t(v)) ++ case uint64: ++ C.pn_data_put_ulong(data, C.uint64_t(v)) ++ case uint: ++ if unsafe.Sizeof(0) == 8 { ++ C.pn_data_put_ulong(data, C.uint64_t(v)) ++ } else { ++ C.pn_data_put_uint(data, C.uint32_t(v)) ++ } ++ case float32: ++ C.pn_data_put_float(data, C.float(v)) ++ case float64: ++ C.pn_data_put_double(data, C.double(v)) ++ case string: ++ C.pn_data_put_string(data, pnBytes([]byte(v))) ++ case []byte: ++ C.pn_data_put_binary(data, pnBytes(v)) ++ case Binary: ++ C.pn_data_put_binary(data, pnBytes([]byte(v))) ++ case Symbol: ++ C.pn_data_put_symbol(data, pnBytes([]byte(v))) ++ case Map: // Special map type ++ C.pn_data_put_map(data) ++ C.pn_data_enter(data) ++ for key, val := range v { ++ marshal(key, data) ++ marshal(val, data) ++ } ++ C.pn_data_exit(data) ++ default: ++ switch reflect.TypeOf(v).Kind() { ++ case reflect.Map: ++ putMap(data, v) ++ case reflect.Slice: ++ putList(data, v) ++ default: ++ panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) ++ } ++ } ++ err := dataError("marshal", data) ++ if err != nil { ++ panic(err) ++ } ++ return ++} ++ ++func clearMarshal(v interface{}, data *C.pn_data_t) { ++ C.pn_data_clear(data) ++ marshal(v, data) ++} ++ ++func putMap(data *C.pn_data_t, v interface{}) { ++ mapValue := reflect.ValueOf(v) ++ C.pn_data_put_map(data) ++ C.pn_data_enter(data) ++ for _, key := range mapValue.MapKeys() { ++ marshal(key.Interface(), data) ++ marshal(mapValue.MapIndex(key).Interface(), data) ++ } ++ C.pn_data_exit(data) ++} ++ ++func putList(data *C.pn_data_t, v interface{}) { ++ listValue := reflect.ValueOf(v) ++ C.pn_data_put_list(data) ++ C.pn_data_enter(data) ++ for i := 0; i < listValue.Len(); i++ { ++ marshal(listValue.Index(i).Interface(), data) ++ } ++ C.pn_data_exit(data) ++} ++ ++// Encoder encodes AMQP values to an io.Writer ++type Encoder struct { ++ writer io.Writer ++ buffer []byte ++} ++ ++// New encoder returns a new encoder that writes to w. ++func NewEncoder(w io.Writer) *Encoder { ++ return &Encoder{w, make([]byte, minEncode)} ++} ++ ++func (e *Encoder) Encode(v interface{}) (err error) { ++ e.buffer, err = Marshal(v, e.buffer) ++ if err == nil { ++ e.writer.Write(e.buffer) ++ } ++ return err ++} ++ ++func replace(data *C.pn_data_t, v interface{}) { ++ C.pn_data_clear(data) ++ marshal(v, data) ++} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/message.go ---------------------------------------------------------------------- diff --cc amqp/message.go index 0000000,0000000..5ba4f4f new file mode 100644 --- /dev/null +++ b/amqp/message.go @@@ -1,0 -1,0 +1,347 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++or more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++package amqp ++ ++// #include <proton/types.h> ++// #include <proton/message.h> ++// #include <proton/codec.h> ++// #include <stdlib.h> ++// ++// /* Helper for setting message string fields */ ++// typedef int (*set_fn)(pn_message_t*, const char*); ++// int msg_set_str(pn_message_t* m, char* s, set_fn set) { ++// int result = set(m, s); ++// free(s); ++// return result; ++// } ++// ++import "C" ++ ++import ( ++ "qpid.apache.org/internal" ++ "runtime" ++ "time" ++ "unsafe" ++) ++ ++// Message is the interface to an AMQP message. ++type Message interface { ++ // Durable indicates that any parties taking responsibility ++ // for the message must durably store the content. ++ Durable() bool ++ SetDurable(bool) ++ ++ // Priority impacts ordering guarantees. Within a ++ // given ordered context, higher priority messages may jump ahead of ++ // lower priority messages. ++ Priority() uint8 ++ SetPriority(uint8) ++ ++ // TTL or Time To Live, a message it may be dropped after this duration ++ TTL() time.Duration ++ SetTTL(time.Duration) ++ ++ // FirstAcquirer indicates ++ // that the recipient of the message is the first recipient to acquire ++ // the message, i.e. there have been no failed delivery attempts to ++ // other acquirers. Note that this does not mean the message has not ++ // been delivered to, but not acquired, by other recipients. ++ FirstAcquirer() bool ++ SetFirstAcquirer(bool) ++ ++ // DeliveryCount tracks how many attempts have been made to ++ // delivery a message. ++ DeliveryCount() uint32 ++ SetDeliveryCount(uint32) ++ ++ // MessageId provides a unique identifier for a message. ++ // it can be an a string, an unsigned long, a uuid or a ++ // binary value. ++ MessageId() interface{} ++ SetMessageId(interface{}) ++ ++ UserId() string ++ SetUserId(string) ++ ++ Address() string ++ SetAddress(string) ++ ++ Subject() string ++ SetSubject(string) ++ ++ ReplyTo() string ++ SetReplyTo(string) ++ ++ // CorrelationId is set on correlated request and response messages. It can be ++ // an a string, an unsigned long, a uuid or a binary value. ++ CorrelationId() interface{} ++ SetCorrelationId(interface{}) ++ ++ ContentType() string ++ SetContentType(string) ++ ++ ContentEncoding() string ++ SetContentEncoding(string) ++ ++ // ExpiryTime indicates an absoulte time when the message may be dropped. ++ // A Zero time (i.e. t.isZero() == true) indicates a message never expires. ++ ExpiryTime() time.Time ++ SetExpiryTime(time.Time) ++ ++ CreationTime() time.Time ++ SetCreationTime(time.Time) ++ ++ GroupId() string ++ SetGroupId(string) ++ ++ GroupSequence() int32 ++ SetGroupSequence(int32) ++ ++ ReplyToGroupId() string ++ SetReplyToGroupId(string) ++ ++ // Instructions - AMQP delivery instructions. ++ Instructions() map[string]interface{} ++ SetInstructions(v map[string]interface{}) ++ ++ // Annotations - AMQP annotations. ++ Annotations() map[string]interface{} ++ SetAnnotations(v map[string]interface{}) ++ ++ // Properties - Application properties. ++ Properties() map[string]interface{} ++ SetProperties(v map[string]interface{}) ++ ++ // Inferred indicates how the message content ++ // is encoded into AMQP sections. If inferred is true then binary and ++ // list values in the body of the message will be encoded as AMQP DATA ++ // and AMQP SEQUENCE sections, respectively. If inferred is false, ++ // then all values in the body of the message will be encoded as AMQP ++ // VALUE sections regardless of their type. ++ Inferred() bool ++ SetInferred(bool) ++ ++ // Marshal a Go value into the message body. See amqp.Marshal() for details. ++ Marshal(interface{}) ++ ++ // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details. ++ Unmarshal(interface{}) ++ ++ // Body value resulting from the default unmarshalling of message body as interface{} ++ Body() interface{} ++ ++ // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough ++ // the message is encoded into it, otherwise a new buffer is created. ++ // Returns the buffer containing the message. ++ Encode(buffer []byte) ([]byte, error) ++ ++ // Decode data into this message. Overwrites an existing message content. ++ Decode(buffer []byte) error ++ ++ // Clear the message contents. ++ Clear() ++ ++ // Copy the contents of another message to this one. ++ Copy(m Message) error ++} ++ ++type message struct{ pn *C.pn_message_t } ++ ++func freeMessage(m *message) { ++ C.pn_message_free(m.pn) ++ m.pn = nil ++} ++ ++// NewMessage creates a new message instance. ++func NewMessage() Message { ++ m := &message{C.pn_message()} ++ runtime.SetFinalizer(m, freeMessage) ++ return m ++} ++ ++// NewMessageWith creates a message with value as the body. Equivalent to ++// m := NewMessage(); m.Marshal(body) ++func NewMessageWith(value interface{}) Message { ++ m := NewMessage() ++ m.Marshal(value) ++ return m ++} ++ ++func (m *message) Clear() { C.pn_message_clear(m.pn) } ++ ++func (m *message) Copy(x Message) error { ++ if data, err := x.Encode(nil); err == nil { ++ return m.Decode(data) ++ } else { ++ return err ++ } ++} ++ ++// ==== message get functions ++ ++func rewindGet(data *C.pn_data_t) (v interface{}) { ++ C.pn_data_rewind(data) ++ C.pn_data_next(data) ++ unmarshal(&v, data) ++ return v ++} ++ ++func rewindMap(data *C.pn_data_t) (v map[string]interface{}) { ++ C.pn_data_rewind(data) ++ C.pn_data_next(data) ++ unmarshal(&v, data) ++ return v ++} ++ ++func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) } ++func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) } ++func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) } ++func (m *message) TTL() time.Duration { ++ return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond ++} ++func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) } ++func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) } ++func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) } ++func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) } ++func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) } ++func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) } ++func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) } ++func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) } ++func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) } ++func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) } ++ ++func (m *message) ExpiryTime() time.Time { ++ return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn)))) ++} ++func (m *message) CreationTime() time.Time { ++ return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn))) ++} ++func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) } ++func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) } ++func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) } ++ ++func (m *message) Instructions() map[string]interface{} { ++ return rewindMap(C.pn_message_instructions(m.pn)) ++} ++func (m *message) Annotations() map[string]interface{} { ++ return rewindMap(C.pn_message_annotations(m.pn)) ++} ++func (m *message) Properties() map[string]interface{} { ++ return rewindMap(C.pn_message_properties(m.pn)) ++} ++ ++// ==== message set methods ++ ++func setData(v interface{}, data *C.pn_data_t) { ++ C.pn_data_clear(data) ++ marshal(v, data) ++} ++ ++func dataString(data *C.pn_data_t) string { ++ str := C.pn_string(C.CString("")) ++ defer C.pn_free(unsafe.Pointer(str)) ++ C.pn_inspect(unsafe.Pointer(data), str) ++ return C.GoString(C.pn_string_get(str)) ++} ++ ++func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(m.Inferred())) } ++func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) } ++func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) } ++func (m *message) SetTTL(d time.Duration) { ++ C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond)) ++} ++func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) } ++func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) } ++func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) } ++func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) } ++func (m *message) SetAddress(s string) { ++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address)) ++} ++func (m *message) SetSubject(s string) { ++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject)) ++} ++func (m *message) SetReplyTo(s string) { ++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to)) ++} ++func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) } ++func (m *message) SetContentType(s string) { ++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type)) ++} ++func (m *message) SetContentEncoding(s string) { ++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding)) ++} ++func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) } ++func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) } ++func (m *message) SetGroupId(s string) { ++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id)) ++} ++func (m *message) SetGroupSequence(s int32) { ++ C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s)) ++} ++func (m *message) SetReplyToGroupId(s string) { ++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id)) ++} ++ ++func (m *message) SetInstructions(v map[string]interface{}) { ++ setData(v, C.pn_message_instructions(m.pn)) ++} ++func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) } ++func (m *message) SetProperties(v map[string]interface{}) { setData(v, C.pn_message_properties(m.pn)) } ++ ++// Marshal/Unmarshal body ++func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) } ++func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) } ++func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return } ++ ++func (m *message) Decode(data []byte) error { ++ m.Clear() ++ if len(data) == 0 { ++ return internal.Errorf("empty buffer for decode") ++ } ++ if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 { ++ return internal.Errorf("decoding message: %s", ++ internal.PnError(unsafe.Pointer(C.pn_message_error(m.pn)))) ++ } ++ return nil ++} ++ ++func DecodeMessage(data []byte) (m Message, err error) { ++ m = NewMessage() ++ err = m.Decode(data) ++ return ++} ++ ++func (m *message) Encode(buffer []byte) ([]byte, error) { ++ encode := func(buf []byte) ([]byte, error) { ++ len := cLen(buf) ++ result := C.pn_message_encode(m.pn, cPtr(buf), &len) ++ switch { ++ case result == C.PN_OVERFLOW: ++ return buf, overflow ++ case result < 0: ++ return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result)) ++ default: ++ return buf[:len], nil ++ } ++ } ++ return encodeGrow(buffer, encode) ++} ++ ++// TODO aconway 2015-09-14: Multi-section messages. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/message_test.go ---------------------------------------------------------------------- diff --cc amqp/message_test.go index 0000000,0000000..7a6e5a8 new file mode 100644 --- /dev/null +++ b/amqp/message_test.go @@@ -1,0 -1,0 +1,166 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++or more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++package amqp ++ ++import ( ++ "testing" ++ "time" ++) ++ ++func roundTrip(m Message) error { ++ buffer, err := m.Encode(nil) ++ if err != nil { ++ return err ++ } ++ m2, err := DecodeMessage(buffer) ++ if err != nil { ++ return err ++ } ++ return checkEqual(m, m2) ++} ++ ++func TestDefaultMessage(t *testing.T) { ++ m := NewMessage() ++ // Check defaults ++ for _, data := range [][]interface{}{ ++ {m.Inferred(), false}, ++ {m.Durable(), false}, ++ {m.Priority(), uint8(4)}, ++ {m.TTL(), time.Duration(0)}, ++ {m.UserId(), ""}, ++ {m.Address(), ""}, ++ {m.Subject(), ""}, ++ {m.ReplyTo(), ""}, ++ {m.ContentType(), ""}, ++ {m.ContentEncoding(), ""}, ++ {m.GroupId(), ""}, ++ {m.GroupSequence(), int32(0)}, ++ {m.ReplyToGroupId(), ""}, ++ {m.MessageId(), nil}, ++ {m.CorrelationId(), nil}, ++ {m.Instructions(), map[string]interface{}{}}, ++ {m.Annotations(), map[string]interface{}{}}, ++ {m.Properties(), map[string]interface{}{}}, ++ {m.Body(), nil}, ++ } { ++ if err := checkEqual(data[0], data[1]); err != nil { ++ t.Error(err) ++ } ++ } ++ if err := roundTrip(m); err != nil { ++ t.Error(err) ++ } ++} ++ ++func TestMessageRoundTrip(t *testing.T) { ++ m := NewMessage() ++ m.SetInferred(false) ++ m.SetDurable(true) ++ m.SetPriority(42) ++ m.SetTTL(0) ++ m.SetUserId("user") ++ m.SetAddress("address") ++ m.SetSubject("subject") ++ m.SetReplyTo("replyto") ++ m.SetContentType("content") ++ m.SetContentEncoding("encoding") ++ m.SetGroupId("group") ++ m.SetGroupSequence(42) ++ m.SetReplyToGroupId("replytogroup") ++ m.SetMessageId("id") ++ m.SetCorrelationId("correlation") ++ m.SetInstructions(map[string]interface{}{"instructions": "foo"}) ++ m.SetAnnotations(map[string]interface{}{"annotations": "foo"}) ++ m.SetProperties(map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}) ++ m.Marshal("hello") ++ ++ for _, data := range [][]interface{}{ ++ {m.Inferred(), false}, ++ {m.Durable(), true}, ++ {m.Priority(), uint8(42)}, ++ {m.TTL(), time.Duration(0)}, ++ {m.UserId(), "user"}, ++ {m.Address(), "address"}, ++ {m.Subject(), "subject"}, ++ {m.ReplyTo(), "replyto"}, ++ {m.ContentType(), "content"}, ++ {m.ContentEncoding(), "encoding"}, ++ {m.GroupId(), "group"}, ++ {m.GroupSequence(), int32(42)}, ++ {m.ReplyToGroupId(), "replytogroup"}, ++ {m.MessageId(), "id"}, ++ {m.CorrelationId(), "correlation"}, ++ {m.Instructions(), map[string]interface{}{"instructions": "foo"}}, ++ {m.Annotations(), map[string]interface{}{"annotations": "foo"}}, ++ {m.Properties(), map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}}, ++ {m.Body(), "hello"}, ++ } { ++ if err := checkEqual(data[0], data[1]); err != nil { ++ t.Error(err) ++ } ++ } ++ if err := roundTrip(m); err != nil { ++ t.Error(err) ++ } ++} ++ ++func TestMessageBodyTypes(t *testing.T) { ++ var s string ++ var body interface{} ++ var i int64 ++ ++ m := NewMessageWith(int64(42)) ++ m.Unmarshal(&body) ++ m.Unmarshal(&i) ++ if err := checkEqual(body.(int64), int64(42)); err != nil { ++ t.Error(err) ++ } ++ if err := checkEqual(i, int64(42)); err != nil { ++ t.Error(err) ++ } ++ ++ m = NewMessageWith("hello") ++ m.Unmarshal(&s) ++ m.Unmarshal(&body) ++ if err := checkEqual(s, "hello"); err != nil { ++ t.Error(err) ++ } ++ if err := checkEqual(body.(string), "hello"); err != nil { ++ t.Error(err) ++ } ++ if err := roundTrip(m); err != nil { ++ t.Error(err) ++ } ++ ++ m = NewMessageWith(Binary("bin")) ++ m.Unmarshal(&s) ++ m.Unmarshal(&body) ++ if err := checkEqual(body.(Binary), Binary("bin")); err != nil { ++ t.Error(err) ++ } ++ if err := checkEqual(s, "bin"); err != nil { ++ t.Error(err) ++ } ++ if err := roundTrip(m); err != nil { ++ t.Error(err) ++ } ++ ++ // TODO aconway 2015-09-08: array etc. ++} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/types.go ---------------------------------------------------------------------- diff --cc amqp/types.go index 0000000,0000000..796da66 new file mode 100644 --- /dev/null +++ b/amqp/types.go @@@ -1,0 -1,0 +1,199 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++or more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++package amqp ++ ++// #include <proton/codec.h> ++import "C" ++ ++import ( ++ "bytes" ++ "fmt" ++ "reflect" ++ "time" ++ "unsafe" ++) ++ ++type Type C.pn_type_t ++ ++func (t Type) String() string { ++ switch C.pn_type_t(t) { ++ case C.PN_NULL: ++ return "null" ++ case C.PN_BOOL: ++ return "bool" ++ case C.PN_UBYTE: ++ return "ubyte" ++ case C.PN_BYTE: ++ return "byte" ++ case C.PN_USHORT: ++ return "ushort" ++ case C.PN_SHORT: ++ return "short" ++ case C.PN_CHAR: ++ return "char" ++ case C.PN_UINT: ++ return "uint" ++ case C.PN_INT: ++ return "int" ++ case C.PN_ULONG: ++ return "ulong" ++ case C.PN_LONG: ++ return "long" ++ case C.PN_TIMESTAMP: ++ return "timestamp" ++ case C.PN_FLOAT: ++ return "float" ++ case C.PN_DOUBLE: ++ return "double" ++ case C.PN_DECIMAL32: ++ return "decimal32" ++ case C.PN_DECIMAL64: ++ return "decimal64" ++ case C.PN_DECIMAL128: ++ return "decimal128" ++ case C.PN_UUID: ++ return "uuid" ++ case C.PN_BINARY: ++ return "binary" ++ case C.PN_STRING: ++ return "string" ++ case C.PN_SYMBOL: ++ return "symbol" ++ case C.PN_DESCRIBED: ++ return "described" ++ case C.PN_ARRAY: ++ return "array" ++ case C.PN_LIST: ++ return "list" ++ case C.PN_MAP: ++ return "map" ++ default: ++ if uint32(t) == uint32(C.PN_INVALID) { ++ return "no-data" ++ } ++ return fmt.Sprintf("unknown-type(%d)", t) ++ } ++} ++ ++// Go types ++var ( ++ bytesType = reflect.TypeOf([]byte{}) ++ valueType = reflect.TypeOf(reflect.Value{}) ++) ++ ++// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys. ++ ++// Map is a generic map that can have mixed key and value types and so can represent any AMQP map ++type Map map[interface{}]interface{} ++ ++// List is a generic list that can hold mixed values and can represent any AMQP list. ++// ++type List []interface{} ++ ++// Symbol is a string that is encoded as an AMQP symbol ++type Symbol string ++ ++func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) } ++ ++// Binary is a string that is encoded as an AMQP binary. ++// It is a string rather than a byte[] because byte[] is not hashable and can't be used as ++// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte ++type Binary string ++ ++func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) } ++ ++// GoString for Map prints values with their types, useful for debugging. ++func (m Map) GoString() string { ++ out := &bytes.Buffer{} ++ fmt.Fprintf(out, "%T{", m) ++ i := len(m) ++ for k, v := range m { ++ fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v) ++ i-- ++ if i > 0 { ++ fmt.Fprint(out, ", ") ++ } ++ } ++ fmt.Fprint(out, "}") ++ return out.String() ++} ++ ++// GoString for List prints values with their types, useful for debugging. ++func (l List) GoString() string { ++ out := &bytes.Buffer{} ++ fmt.Fprintf(out, "%T{", l) ++ for i := 0; i < len(l); i++ { ++ fmt.Fprintf(out, "%T(%#v)", l[i], l[i]) ++ if i == len(l)-1 { ++ fmt.Fprint(out, ", ") ++ } ++ } ++ fmt.Fprint(out, "}") ++ return out.String() ++} ++ ++// pnTime converts Go time.Time to Proton millisecond Unix time. ++func pnTime(t time.Time) C.pn_timestamp_t { ++ secs := t.Unix() ++ // Note: sub-second accuracy is not guaraunteed if the Unix time in ++ // nanoseconds cannot be represented by an int64 (sometime around year 2260) ++ msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) ++ return C.pn_timestamp_t(secs*1000 + msecs) ++} ++ ++// goTime converts a pn_timestamp_t to a Go time.Time. ++func goTime(t C.pn_timestamp_t) time.Time { ++ secs := int64(t) / 1000 ++ nsecs := (int64(t) % 1000) * int64(time.Millisecond) ++ return time.Unix(secs, nsecs) ++} ++ ++func goBytes(cBytes C.pn_bytes_t) (bytes []byte) { ++ if cBytes.start != nil { ++ bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size)) ++ } ++ return ++} ++ ++func goString(cBytes C.pn_bytes_t) (str string) { ++ if cBytes.start != nil { ++ str = C.GoStringN(cBytes.start, C.int(cBytes.size)) ++ } ++ return ++} ++ ++func pnBytes(b []byte) C.pn_bytes_t { ++ if len(b) == 0 { ++ return C.pn_bytes_t{0, nil} ++ } else { ++ return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))} ++ } ++} ++ ++func cPtr(b []byte) *C.char { ++ if len(b) == 0 { ++ return nil ++ } ++ return (*C.char)(unsafe.Pointer(&b[0])) ++} ++ ++func cLen(b []byte) C.size_t { ++ return C.size_t(len(b)) ++} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/unmarshal.go ---------------------------------------------------------------------- diff --cc amqp/unmarshal.go index 0000000,0000000..751921d new file mode 100644 --- /dev/null +++ b/amqp/unmarshal.go @@@ -1,0 -1,0 +1,558 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++oor more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++package amqp ++ ++// #include <proton/codec.h> ++import "C" ++ ++import ( ++ "bytes" ++ "fmt" ++ "io" ++ "qpid.apache.org/internal" ++ "reflect" ++ "unsafe" ++) ++ ++const minDecode = 1024 ++ ++// Error returned if AMQP data cannot be unmarshaled as the desired Go type. ++type UnmarshalError struct { ++ // The name of the AMQP type. ++ AMQPType string ++ // The Go type. ++ GoType reflect.Type ++} ++ ++func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError { ++ return &UnmarshalError{Type(pnType).String(), reflect.TypeOf(v)} ++} ++ ++func (e UnmarshalError) Error() string { ++ if e.GoType.Kind() != reflect.Ptr { ++ return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType) ++ } else { ++ return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) ++ } ++} ++ ++func doRecover(err *error) { ++ r := recover() ++ switch r := r.(type) { ++ case nil: ++ case *UnmarshalError, internal.Error: ++ *err = r.(error) ++ default: ++ panic(r) ++ } ++} ++ ++// ++// Decoding from a pn_data_t ++// ++// NOTE: we use panic() to signal a decoding error, simplifies decoding logic. ++// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode. ++// ++ ++// Decoder decodes AMQP values from an io.Reader. ++// ++type Decoder struct { ++ reader io.Reader ++ buffer bytes.Buffer ++} ++ ++// NewDecoder returns a new decoder that reads from r. ++// ++// The decoder has it's own buffer and may read more data than required for the ++// AMQP values requested. Use Buffered to see if there is data left in the ++// buffer. ++// ++func NewDecoder(r io.Reader) *Decoder { ++ return &Decoder{r, bytes.Buffer{}} ++} ++ ++// Buffered returns a reader of the data remaining in the Decoder's buffer. The ++// reader is valid until the next call to Decode. ++// ++func (d *Decoder) Buffered() io.Reader { ++ return bytes.NewReader(d.buffer.Bytes()) ++} ++ ++// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v. ++// ++// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value. ++// ++func (d *Decoder) Decode(v interface{}) (err error) { ++ defer doRecover(&err) ++ data := C.pn_data(0) ++ defer C.pn_data_free(data) ++ var n int ++ for n == 0 && err == nil { ++ n = decode(data, d.buffer.Bytes()) ++ if n == 0 { // n == 0 means not enough data, read more ++ err = d.more() ++ } else { ++ unmarshal(v, data) ++ } ++ } ++ d.buffer.Next(n) ++ return ++} ++ ++/* ++Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v. ++Types are converted as follows: ++ ++ +---------------------------+----------------------------------------------------------------------+ ++ |To Go types |From AMQP types | ++ +===========================+======================================================================+ ++ |bool |bool | ++ +---------------------------+----------------------------------------------------------------------+ ++ |int, int8, int16, |Equivalent or smaller signed integer type: byte, short, int, long. | ++ |int32, int64 | | ++ +---------------------------+----------------------------------------------------------------------+ ++ |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: ubyte, ushort, uint, | ++ |uint32, uint64 types |ulong | ++ +---------------------------+----------------------------------------------------------------------+ ++ |float32, float64 |Equivalent or smaller float or double. | ++ +---------------------------+----------------------------------------------------------------------+ ++ |string, []byte |string, symbol or binary. | ++ +---------------------------+----------------------------------------------------------------------+ ++ |Symbol |symbol | ++ +---------------------------+----------------------------------------------------------------------+ ++ |map[K]T |map, provided all keys and values can unmarshal to types K, T | ++ +---------------------------+----------------------------------------------------------------------+ ++ |Map |map, any AMQP map | ++ +---------------------------+----------------------------------------------------------------------+ ++ |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: | ++ | +------------------------+---------------------------------------------+ ++ | |AMQP Type |Go Type in interface{} | ++ | +========================+=============================================+ ++ | |bool |bool | ++ | +------------------------+---------------------------------------------+ ++ | |byte,short,int,long |int8,int16,int32,int64 | ++ | +------------------------+---------------------------------------------+ ++ | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 | ++ | +------------------------+---------------------------------------------+ ++ | |float, double |float32, float64 | ++ | +------------------------+---------------------------------------------+ ++ | |string |string | ++ | +------------------------+---------------------------------------------+ ++ | |symbol |Symbol | ++ | +------------------------+---------------------------------------------+ ++ | |binary |Binary | ++ | +------------------------+---------------------------------------------+ ++ | |nulll |nil | ++ | +------------------------+---------------------------------------------+ ++ | |map |Map | ++ | +------------------------+---------------------------------------------+ ++ | |list |List | ++ +---------------------------+------------------------+---------------------------------------------+ ++ ++The following Go types cannot be unmarshaled: uintptr, function, interface, channel. ++ ++TODO ++ ++Go types: array, struct. ++ ++AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies. ++ ++AMQP maps with mixed/unhashable key types need an alternate representation. ++ ++Described types. ++*/ ++func Unmarshal(bytes []byte, v interface{}) (n int, err error) { ++ defer doRecover(&err) ++ ++ data := C.pn_data(0) ++ defer C.pn_data_free(data) ++ n = decode(data, bytes) ++ if n == 0 { ++ err = internal.Errorf("not enough data") ++ } else { ++ unmarshal(v, data) ++ } ++ return ++} ++ ++// more reads more data when we can't parse a complete AMQP type ++func (d *Decoder) more() error { ++ var readSize int64 = minDecode ++ if int64(d.buffer.Len()) > readSize { // Grow by doubling ++ readSize = int64(d.buffer.Len()) ++ } ++ var n int64 ++ n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize)) ++ if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0 ++ err = io.EOF ++ } ++ return err ++} ++ ++// Unmarshal from data into value pointed at by v. ++func unmarshal(v interface{}, data *C.pn_data_t) { ++ pnType := C.pn_data_type(data) ++ switch v := v.(type) { ++ case *bool: ++ switch pnType { ++ case C.PN_BOOL: ++ *v = bool(C.pn_data_get_bool(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ case *int8: ++ switch pnType { ++ case C.PN_CHAR: ++ *v = int8(C.pn_data_get_char(data)) ++ case C.PN_BYTE: ++ *v = int8(C.pn_data_get_byte(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ case *uint8: ++ switch pnType { ++ case C.PN_CHAR: ++ *v = uint8(C.pn_data_get_char(data)) ++ case C.PN_UBYTE: ++ *v = uint8(C.pn_data_get_ubyte(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ case *int16: ++ switch pnType { ++ case C.PN_CHAR: ++ *v = int16(C.pn_data_get_char(data)) ++ case C.PN_BYTE: ++ *v = int16(C.pn_data_get_byte(data)) ++ case C.PN_SHORT: ++ *v = int16(C.pn_data_get_short(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ case *uint16: ++ switch pnType { ++ case C.PN_CHAR: ++ *v = uint16(C.pn_data_get_char(data)) ++ case C.PN_UBYTE: ++ *v = uint16(C.pn_data_get_ubyte(data)) ++ case C.PN_USHORT: ++ *v = uint16(C.pn_data_get_ushort(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ case *int32: ++ switch pnType { ++ case C.PN_CHAR: ++ *v = int32(C.pn_data_get_char(data)) ++ case C.PN_BYTE: ++ *v = int32(C.pn_data_get_byte(data)) ++ case C.PN_SHORT: ++ *v = int32(C.pn_data_get_short(data)) ++ case C.PN_INT: ++ *v = int32(C.pn_data_get_int(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ case *uint32: ++ switch pnType { ++ case C.PN_CHAR: ++ *v = uint32(C.pn_data_get_char(data)) ++ case C.PN_UBYTE: ++ *v = uint32(C.pn_data_get_ubyte(data)) ++ case C.PN_USHORT: ++ *v = uint32(C.pn_data_get_ushort(data)) ++ case C.PN_UINT: ++ *v = uint32(C.pn_data_get_uint(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ ++ case *int64: ++ switch pnType { ++ case C.PN_CHAR: ++ *v = int64(C.pn_data_get_char(data)) ++ case C.PN_BYTE: ++ *v = int64(C.pn_data_get_byte(data)) ++ case C.PN_SHORT: ++ *v = int64(C.pn_data_get_short(data)) ++ case C.PN_INT: ++ *v = int64(C.pn_data_get_int(data)) ++ case C.PN_LONG: ++ *v = int64(C.pn_data_get_long(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ ++ case *uint64: ++ switch pnType { ++ case C.PN_CHAR: ++ *v = uint64(C.pn_data_get_char(data)) ++ case C.PN_UBYTE: ++ *v = uint64(C.pn_data_get_ubyte(data)) ++ case C.PN_USHORT: ++ *v = uint64(C.pn_data_get_ushort(data)) ++ case C.PN_ULONG: ++ *v = uint64(C.pn_data_get_ulong(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ ++ case *int: ++ switch pnType { ++ case C.PN_CHAR: ++ *v = int(C.pn_data_get_char(data)) ++ case C.PN_BYTE: ++ *v = int(C.pn_data_get_byte(data)) ++ case C.PN_SHORT: ++ *v = int(C.pn_data_get_short(data)) ++ case C.PN_INT: ++ *v = int(C.pn_data_get_int(data)) ++ case C.PN_LONG: ++ if unsafe.Sizeof(0) == 8 { ++ *v = int(C.pn_data_get_long(data)) ++ } else { ++ panic(newUnmarshalError(pnType, v)) ++ } ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ ++ case *uint: ++ switch pnType { ++ case C.PN_CHAR: ++ *v = uint(C.pn_data_get_char(data)) ++ case C.PN_UBYTE: ++ *v = uint(C.pn_data_get_ubyte(data)) ++ case C.PN_USHORT: ++ *v = uint(C.pn_data_get_ushort(data)) ++ case C.PN_UINT: ++ *v = uint(C.pn_data_get_uint(data)) ++ case C.PN_ULONG: ++ if unsafe.Sizeof(0) == 8 { ++ *v = uint(C.pn_data_get_ulong(data)) ++ } else { ++ panic(newUnmarshalError(pnType, v)) ++ } ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ ++ case *float32: ++ switch pnType { ++ case C.PN_FLOAT: ++ *v = float32(C.pn_data_get_float(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ ++ case *float64: ++ switch pnType { ++ case C.PN_FLOAT: ++ *v = float64(C.pn_data_get_float(data)) ++ case C.PN_DOUBLE: ++ *v = float64(C.pn_data_get_double(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ ++ case *string: ++ switch pnType { ++ case C.PN_STRING: ++ *v = goString(C.pn_data_get_string(data)) ++ case C.PN_SYMBOL: ++ *v = goString(C.pn_data_get_symbol(data)) ++ case C.PN_BINARY: ++ *v = goString(C.pn_data_get_binary(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ ++ case *[]byte: ++ switch pnType { ++ case C.PN_STRING: ++ *v = goBytes(C.pn_data_get_string(data)) ++ case C.PN_SYMBOL: ++ *v = goBytes(C.pn_data_get_symbol(data)) ++ case C.PN_BINARY: ++ *v = goBytes(C.pn_data_get_binary(data)) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ ++ case *Binary: ++ switch pnType { ++ case C.PN_BINARY: ++ *v = Binary(goBytes(C.pn_data_get_binary(data))) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ ++ case *Symbol: ++ switch pnType { ++ case C.PN_SYMBOL: ++ *v = Symbol(goBytes(C.pn_data_get_symbol(data))) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ ++ case *interface{}: ++ getInterface(data, v) ++ ++ default: ++ if reflect.TypeOf(v).Kind() != reflect.Ptr { ++ panic(newUnmarshalError(pnType, v)) ++ } ++ switch reflect.TypeOf(v).Elem().Kind() { ++ case reflect.Map: ++ getMap(data, v) ++ case reflect.Slice: ++ getList(data, v) ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++ } ++ err := dataError("unmarshaling", data) ++ if err != nil { ++ panic(err) ++ } ++ return ++} ++ ++func rewindUnmarshal(v interface{}, data *C.pn_data_t) { ++ C.pn_data_rewind(data) ++ C.pn_data_next(data) ++ unmarshal(v, data) ++} ++ ++// Getting into an interface is driven completely by the AMQP type, since the interface{} ++// target is type-neutral. ++func getInterface(data *C.pn_data_t, v *interface{}) { ++ pnType := C.pn_data_type(data) ++ switch pnType { ++ // Note PN_INVALID is defined outside the enum, older Go versions don't consider it a C.pn_type_t ++ case C.PN_NULL, C.pn_type_t(C.PN_INVALID): // No data. ++ *v = nil ++ case C.PN_BOOL: ++ *v = bool(C.pn_data_get_bool(data)) ++ case C.PN_UBYTE: ++ *v = uint8(C.pn_data_get_ubyte(data)) ++ case C.PN_BYTE: ++ *v = int8(C.pn_data_get_byte(data)) ++ case C.PN_USHORT: ++ *v = uint16(C.pn_data_get_ushort(data)) ++ case C.PN_SHORT: ++ *v = int16(C.pn_data_get_short(data)) ++ case C.PN_UINT: ++ *v = uint32(C.pn_data_get_uint(data)) ++ case C.PN_INT: ++ *v = int32(C.pn_data_get_int(data)) ++ case C.PN_CHAR: ++ *v = uint8(C.pn_data_get_char(data)) ++ case C.PN_ULONG: ++ *v = uint64(C.pn_data_get_ulong(data)) ++ case C.PN_LONG: ++ *v = int64(C.pn_data_get_long(data)) ++ case C.PN_FLOAT: ++ *v = float32(C.pn_data_get_float(data)) ++ case C.PN_DOUBLE: ++ *v = float64(C.pn_data_get_double(data)) ++ case C.PN_BINARY: ++ *v = Binary(goBytes(C.pn_data_get_binary(data))) ++ case C.PN_STRING: ++ *v = goString(C.pn_data_get_string(data)) ++ case C.PN_SYMBOL: ++ *v = Symbol(goString(C.pn_data_get_symbol(data))) ++ case C.PN_MAP: ++ m := make(Map) ++ unmarshal(&m, data) ++ *v = m ++ case C.PN_LIST: ++ l := make(List, 0) ++ unmarshal(&l, data) ++ *v = l ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++} ++ ++// get into map pointed at by v ++func getMap(data *C.pn_data_t, v interface{}) { ++ mapValue := reflect.ValueOf(v).Elem() ++ mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map ++ switch pnType := C.pn_data_type(data); pnType { ++ case C.PN_MAP: ++ count := int(C.pn_data_get_map(data)) ++ if bool(C.pn_data_enter(data)) { ++ defer C.pn_data_exit(data) ++ for i := 0; i < count/2; i++ { ++ if bool(C.pn_data_next(data)) { ++ key := reflect.New(mapValue.Type().Key()) ++ unmarshal(key.Interface(), data) ++ if bool(C.pn_data_next(data)) { ++ val := reflect.New(mapValue.Type().Elem()) ++ unmarshal(val.Interface(), data) ++ mapValue.SetMapIndex(key.Elem(), val.Elem()) ++ } ++ } ++ } ++ } ++ // Note PN_INVALID is defined outside the enum, older Go versions don't consider it a C.pn_type_t ++ case C.pn_type_t(C.PN_INVALID): // Leave the map empty ++ default: ++ panic(newUnmarshalError(pnType, v)) ++ } ++} ++ ++func getList(data *C.pn_data_t, v interface{}) { ++ pnType := C.pn_data_type(data) ++ if pnType != C.PN_LIST { ++ panic(newUnmarshalError(pnType, v)) ++ } ++ count := int(C.pn_data_get_list(data)) ++ listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count) ++ if bool(C.pn_data_enter(data)) { ++ for i := 0; i < count; i++ { ++ if bool(C.pn_data_next(data)) { ++ val := reflect.New(listValue.Type().Elem()) ++ unmarshal(val.Interface(), data) ++ listValue.Index(i).Set(val.Elem()) ++ } ++ } ++ C.pn_data_exit(data) ++ } ++ reflect.ValueOf(v).Elem().Set(listValue) ++} ++ ++// decode from bytes. ++// Return bytes decoded or 0 if we could not decode a complete object. ++// ++func decode(data *C.pn_data_t, bytes []byte) int { ++ if len(bytes) == 0 { ++ return 0 ++ } ++ n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes))) ++ if n == int(C.PN_UNDERFLOW) { ++ C.pn_error_clear(C.pn_data_error(data)) ++ return 0 ++ } else if n <= 0 { ++ panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n))) ++ } ++ return n ++} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/url.go ---------------------------------------------------------------------- diff --cc amqp/url.go index 0000000,0000000..0d0c662 new file mode 100644 --- /dev/null +++ b/amqp/url.go @@@ -1,0 -1,0 +1,96 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++or more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++package amqp ++ ++/* ++#include <stdlib.h> ++#include <string.h> ++#include <proton/url.h> ++ ++// Helper function for setting URL fields. ++typedef void (*setter_fn)(pn_url_t* url, const char* value); ++inline void set(pn_url_t *url, setter_fn s, const char* value) { ++ s(url, value); ++} ++*/ ++import "C" ++ ++import ( ++ "net" ++ "net/url" ++ "qpid.apache.org/internal" ++ "unsafe" ++) ++ ++const ( ++ amqp string = "amqp" ++ amqps = "amqps" ++) ++ ++// ParseUrl parses an AMQP URL string and returns a net/url.Url. ++// ++// It is more forgiving than net/url.Parse and allows most of the parts of the ++// URL to be missing, assuming AMQP defaults. ++// ++func ParseURL(s string) (u *url.URL, err error) { ++ cstr := C.CString(s) ++ defer C.free(unsafe.Pointer(cstr)) ++ pnUrl := C.pn_url_parse(cstr) ++ if pnUrl == nil { ++ return nil, internal.Errorf("bad URL %#v", s) ++ } ++ defer C.pn_url_free(pnUrl) ++ ++ scheme := C.GoString(C.pn_url_get_scheme(pnUrl)) ++ username := C.GoString(C.pn_url_get_username(pnUrl)) ++ password := C.GoString(C.pn_url_get_password(pnUrl)) ++ host := C.GoString(C.pn_url_get_host(pnUrl)) ++ port := C.GoString(C.pn_url_get_port(pnUrl)) ++ path := C.GoString(C.pn_url_get_path(pnUrl)) ++ ++ if err != nil { ++ return nil, internal.Errorf("bad URL %#v: %s", s, err) ++ } ++ if scheme == "" { ++ scheme = amqp ++ } ++ if port == "" { ++ if scheme == amqps { ++ port = amqps ++ } else { ++ port = amqp ++ } ++ } ++ var user *url.Userinfo ++ if password != "" { ++ user = url.UserPassword(username, password) ++ } else if username != "" { ++ user = url.User(username) ++ } ++ ++ u = &url.URL{ ++ Scheme: scheme, ++ User: user, ++ Host: net.JoinHostPort(host, port), ++ Path: path, ++ } ++ ++ return u, nil ++} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/url_test.go ---------------------------------------------------------------------- diff --cc amqp/url_test.go index 0000000,0000000..f80f1c4 new file mode 100644 --- /dev/null +++ b/amqp/url_test.go @@@ -1,0 -1,0 +1,51 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++or more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++package amqp ++ ++import ( ++ "fmt" ++) ++ ++func ExampleParseURL() { ++ for _, s := range []string{ ++ "amqp://username:password@host:1234/path", ++ "host:1234", ++ "host", ++ ":1234", ++ "host/path", ++ "amqps://host", ++ "", ++ } { ++ u, err := ParseURL(s) ++ if err != nil { ++ fmt.Println(err) ++ } else { ++ fmt.Println(u) ++ } ++ } ++ // Output: ++ // amqp://username:password@host:1234/path ++ // amqp://host:1234 ++ // amqp://host:amqp ++ // amqp://:1234 ++ // amqp://host:amqp/path ++ // amqps://host:amqps ++ // proton: bad URL "" ++} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/connection.go ---------------------------------------------------------------------- diff --cc electron/connection.go index 0000000,0000000..d6761d6 new file mode 100644 --- /dev/null +++ b/electron/connection.go @@@ -1,0 -1,0 +1,218 @@@ ++/* ++Licensed to the Apache Software Foundation (ASF) under one ++or more contributor license agreements. See the NOTICE file ++distributed with this work for additional information ++regarding copyright ownership. The ASF licenses this file ++to you under the Apache License, Version 2.0 (the ++"License"); you may not use this file except in compliance ++with the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, ++software distributed under the License is distributed on an ++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++KIND, either express or implied. See the License for the ++specific language governing permissions and limitations ++under the License. ++*/ ++ ++package electron ++ ++// #include <proton/disposition.h> ++import "C" ++ ++import ( ++ "net" ++ "qpid.apache.org/amqp" ++ "qpid.apache.org/internal" ++ "qpid.apache.org/proton" ++ "sync" ++ "time" ++) ++ ++// Connection is an AMQP connection, created by a Container. ++type Connection interface { ++ Endpoint ++ ++ // Sender opens a new sender on the DefaultSession. ++ // ++ // v can be a string, which is used as the Target address, or a SenderSettings ++ // struct containing more details settings. ++ Sender(...LinkSetting) (Sender, error) ++ ++ // Receiver opens a new Receiver on the DefaultSession(). ++ // ++ // v can be a string, which is used as the ++ // Source address, or a ReceiverSettings struct containing more details ++ // settings. ++ Receiver(...LinkSetting) (Receiver, error) ++ ++ // DefaultSession() returns a default session for the connection. It is opened ++ // on the first call to DefaultSession and returned on subsequent calls. ++ DefaultSession() (Session, error) ++ ++ // Session opens a new session. ++ Session(...SessionSetting) (Session, error) ++ ++ // Container for the connection. ++ Container() Container ++ ++ // Disconnect the connection abruptly with an error. ++ Disconnect(error) ++ ++ // Wait waits for the connection to be disconnected. ++ Wait() error ++ ++ // WaitTimeout is like Wait but returns Timeout if the timeout expires. ++ WaitTimeout(time.Duration) error ++} ++ ++// ConnectionSetting can be passed when creating a connection. ++// See functions that return ConnectionSetting for details ++type ConnectionSetting func(*connection) ++ ++// Server setting puts the connection in server mode. ++// ++// A server connection will do protocol negotiation to accept a incoming AMQP ++// connection. Normally you would call this for a connection created by ++// net.Listener.Accept() ++// ++func Server() ConnectionSetting { return func(c *connection) { c.engine.Server() } } ++ ++// Accepter provides a function to be called when a connection receives an incoming ++// request to open an endpoint, one of IncomingSession, IncomingSender or IncomingReceiver. ++// ++// The accept() function must not block or use the accepted endpoint. ++// It can pass the endpoint to another goroutine for processing. ++// ++// By default all incoming endpoints are rejected. ++func Accepter(accept func(Incoming)) ConnectionSetting { ++ return func(c *connection) { c.accept = accept } ++} ++ ++type connection struct { ++ endpoint ++ listenOnce, defaultSessionOnce, closeOnce sync.Once ++ ++ container *container ++ conn net.Conn ++ accept func(Incoming) ++ handler *handler ++ engine *proton.Engine ++ err internal.ErrorHolder ++ eConnection proton.Connection ++ ++ defaultSession Session ++ done chan struct{} ++} ++ ++func newConnection(conn net.Conn, cont *container, setting ...ConnectionSetting) (*connection, error) { ++ c := &connection{container: cont, conn: conn, accept: func(Incoming) {}, done: make(chan struct{})} ++ c.handler = newHandler(c) ++ var err error ++ c.engine, err = proton.NewEngine(c.conn, c.handler.delegator) ++ if err != nil { ++ return nil, err ++ } ++ for _, set := range setting { ++ set(c) ++ } ++ c.str = c.engine.String() ++ c.eConnection = c.engine.Connection() ++ go func() { c.engine.Run(); close(c.done) }() ++ return c, nil ++} ++ ++func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) } ++ ++func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) } ++ ++func (c *connection) Session(setting ...SessionSetting) (Session, error) { ++ var s Session ++ err := c.engine.InjectWait(func() error { ++ eSession, err := c.engine.Connection().Session() ++ if err == nil { ++ eSession.Open() ++ if err == nil { ++ s = newSession(c, eSession, setting...) ++ } ++ } ++ return err ++ }) ++ return s, err ++} ++ ++func (c *connection) Container() Container { return c.container } ++ ++func (c *connection) DefaultSession() (s Session, err error) { ++ c.defaultSessionOnce.Do(func() { ++ c.defaultSession, err = c.Session() ++ }) ++ if err == nil { ++ err = c.Error() ++ } ++ return c.defaultSession, err ++} ++ ++func (c *connection) Sender(setting ...LinkSetting) (Sender, error) { ++ if s, err := c.DefaultSession(); err == nil { ++ return s.Sender(setting...) ++ } else { ++ return nil, err ++ } ++} ++ ++func (c *connection) Receiver(setting ...LinkSetting) (Receiver, error) { ++ if s, err := c.DefaultSession(); err == nil { ++ return s.Receiver(setting...) ++ } else { ++ return nil, err ++ } ++} ++ ++func (c *connection) Connection() Connection { return c } ++ ++func (c *connection) Wait() error { return c.WaitTimeout(Forever) } ++func (c *connection) WaitTimeout(timeout time.Duration) error { ++ _, err := timedReceive(c.done, timeout) ++ if err == Timeout { ++ return Timeout ++ } ++ return c.Error() ++} ++ ++// Incoming is the interface for incoming requests to open an endpoint. ++// Implementing types are IncomingSession, IncomingSender and IncomingReceiver. ++type Incoming interface { ++ // Accept the endpoint with default settings. ++ // ++ // You must not use the returned endpoint in the accept() function that ++ // receives the Incoming value, but you can pass it to other goroutines. ++ // ++ // Implementing types provide type-specific Accept functions that take additional settings. ++ Accept() Endpoint ++ ++ // Reject the endpoint with an error ++ Reject(error) ++ ++ error() error ++} ++ ++type incoming struct { ++ err error ++ accepted bool ++} ++ ++func (i *incoming) Reject(err error) { i.err = err } ++ ++func (i *incoming) error() error { ++ switch { ++ case i.err != nil: ++ return i.err ++ case !i.accepted: ++ return amqp.Errorf(amqp.NotAllowed, "remote open rejected") ++ default: ++ return nil ++ } ++} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org