PROTON-827: Moved go binding work to branch `go` Go binding removed from master, available on branch `go`.
master still has bindings/go/README.md with a pointer to the branch, so people following old discussions can find it. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/132820cb Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/132820cb Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/132820cb Branch: refs/heads/cjansen-cpp-client Commit: 132820cb5910f0bbe3c9ac4f702b04b4eb0051d0 Parents: 460c787 Author: Alan Conway <[email protected]> Authored: Tue May 12 19:01:10 2015 -0400 Committer: Alan Conway <[email protected]> Committed: Tue May 12 19:01:10 2015 -0400 ---------------------------------------------------------------------- proton-c/bindings/go/README.md | 197 +---- proton-c/bindings/go/WARNING_EXPERIMENTAL | 1 - proton-c/bindings/go/src/Makefile | 16 - proton-c/bindings/go/src/genwrap.go | 423 ----------- .../go/src/qpid.apache.org/proton/doc.go | 40 - .../go/src/qpid.apache.org/proton/event/doc.go | 38 - .../qpid.apache.org/proton/event/handlers.go | 411 ----------- .../src/qpid.apache.org/proton/event/message.go | 75 -- .../go/src/qpid.apache.org/proton/event/pump.go | 357 --------- .../qpid.apache.org/proton/event/wrappers.go | 253 ------- .../proton/event/wrappers_gen.go | 732 ------------------- .../qpid.apache.org/proton/internal/error.go | 125 ---- .../src/qpid.apache.org/proton/interop_test.go | 308 -------- .../go/src/qpid.apache.org/proton/marshal.go | 238 ------ .../go/src/qpid.apache.org/proton/message.go | 342 --------- .../src/qpid.apache.org/proton/message_test.go | 90 --- .../src/qpid.apache.org/proton/messaging/doc.go | 28 - .../proton/messaging/example_test.go | 268 ------- .../qpid.apache.org/proton/messaging/handler.go | 70 -- .../proton/messaging/messaging.go | 250 ------- .../go/src/qpid.apache.org/proton/types.go | 193 ----- .../go/src/qpid.apache.org/proton/uid.go | 40 - .../go/src/qpid.apache.org/proton/unmarshal.go | 552 -------------- .../go/src/qpid.apache.org/proton/url.go | 96 --- .../go/src/qpid.apache.org/proton/url_test.go | 51 -- 25 files changed, 3 insertions(+), 5191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/README.md ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md index a70aa2c..9fd0a0f 100644 --- a/proton-c/bindings/go/README.md +++ b/proton-c/bindings/go/README.md @@ -1,196 +1,5 @@ # *EXPERIMENTAL* Go binding for proton -This is the beginning of a [Go](http://golang.org) binding for proton. - -This work is in early *experimental* stages, *everything* may change in future. -Comments and contributions are strongly encouraged, this experiment is public so -early feedback can guide development. - -- Email <[email protected]> -- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue. - -## Goals - -The API should - -- be idiomatic, unsurprising, and easy to use for Go developers. -- support client and server development. -- make simple tasks simple. -- provide deep access to AMQP protocol when that is required. - -There are two types of developer we want to support - -1. Go developers using AMQP as a message transport: - - Straightforward conversions between Go built-in types and AMQP types. - - Easy message exchange via Go channels to support use in goroutines. - -2. AMQP-aware developers using Go as an implementation language: - - Go types to exactly represent all AMQP types and encoding details. - - Full access to detailed AMQP concepts: sessions, links, deliveries etc. - -## Status - -Package proton encodes and decodes AMQP messages and data as Go types. - -Sub-packages 'event' and 'messaging' provide two alternative ways to write -AMQP clients and servers. 'messaging' is easier for general purpose use. 'event' -gives complete low-level control of the underlying proton C engine. - -The event package is fairly complete, with the exception of the proton -reactor. It's unclear if the reactor is important for go. - -The messaging package is just starting. The examples work but anything else might not. - -There are working [examples](../../../examples/go) of a broker, sender and receiver. - -## The event driven API - -See the package documentation for details. - -## The Go API - -The goal: A procedural API that allows any user goroutine to send and receive -AMQP messages and other information (acknowledgments, flow control instructions -etc.) using channels. There will be no user-visible locks and no need to run -user code in special goroutines, e.g. as handlers in a proton event loop. - -See the package documentation for emerging details. - -Currently using a channel to receive messages, a function to send them (channels -internally) and a channel as a "future" for acknowledgements to senders. This -may change. - -## Design Questions - - -1. Error reporting and handling, esp. async. errors: - -What are common patterns for handling errors across channels? I.e. the thing at -one end of the channel blows up, how do you tell the other end? - -readers: you can close the channel, but there's no error info. You could pass a -struct { data, error } or use a second channel. Pros & cons? - -writers: you can't close without a panic so you need a second channel. Is this -a normal pattern: - - select { - data -> sendChan: sentit() - err := <- errChan: oops(err) - } - -2. Use of channels: - -I recently saw an interesting Go tip: "Make your API synchronous because in Go -it is simple to make a sync call async by putting it in a goroutine." - -What are the tradeoffs of exposing channels directly in the API vs. hiding them -behind methods? Exposing lets users select directly, less overhead than starting -a goroutine, creating MORE channels and selecting those. Hiding lets us wrap -patterns like the 'select {data, err}' pattern above, which is easier and less -error prone than asking users to do it themselves. - -The standard net.Conn uses blocking methods, not channels. I did as the tip says -and wrapped them in goroutines and channels. The library does expose *read* -channels e.g. time.After. Are there any *write* channels in the standard -library? I note that time.After has no errors, and suspect that may be a key -factor in the descison. - -3. The "future" pattern for acknowledgements: super easy in Go but depends on 1. and 2. above. - -## Why a separate API for Go? - -Go is a concurrent language and encourages applications to be divided into -concurrent *goroutines*. It provides traditional locking but it encourages the -use *channels* to communicate between goroutines without explicit locks: - - "Share memory by communicating, don't communicate by sharing memory" - -The idea is that a given value is only operated on by one goroutine at a time, -but values can easily be passed from one goroutine to another. This removes much -of the need for locking. - -Go literature distinguishes between: - -- *concurrency*: "keeping track of things that could be done in parallel" -- *parallelism*: "actually doing things in parallel" - -The application expresses concurrency by starting goroutines for potentially -concurrent tasks. The Go run-times schedule the activity of goroutines onto a -small number (possibly one) of actual parallel executions. - -Even with *no* parallelism, concurrency lets the Go run-times *order* work with -respect to events like file descriptors being readable/writable, channels having -data, timers firing etc. Go automatically takes care of switching out goroutines -that block or sleep so it is normal to write code in terms of blocking calls. - -Event-driven API (like poll, epoll, select or the proton event API) also -channel unpredictably ordered events to actions in one or a small pool of -execution threads. However this requires a different style of programming: -"event-driven" or "reactive" programming. Go developers call it "inside-out" -programming. In an event-driven architecture blocking is a big problem as it -consumes a scarce thread of execution, so actions that take time to complete -have to be re-structured in terms of future event delivery. - -The promise of Go is that you can express your application in concurrent, -procedural terms with simple blocking calls and the Go run-times will turn it -inside-out for you. Write as many goroutines as you want, and let Go interleave -and schedule them efficiently. - -For example: the Go equivalent of listening for connections is a goroutine with -a simple endless loop that calls a blocking Listen() function and starts a -goroutine for each new connection. Each connection has its own goroutine that -deals with just that connection till it closes. - -The benefit is that the variables and logic live closer together. Once you're in -a goroutine, you have everything you need in local variables, and they are -preserved across blocking calls. There's no need to store details in context -objects that you have to look up when handling a later event to figure out how -to continue where you left off. - -So a Go-like proton API does not force the users code to run in an event-loop -goroutine. Instead user goroutines communicate with the event loop(s) via -channels. There's no need to funnel connections into one event loop, in fact it -makes no sense. Connections can be processed concurrently so they should be -processed in separate goroutines and left to Go to schedule. User goroutines can -have simple loops that block channels till messages are available, the user can -start as many or as few such goroutines as they wish to implement concurrency as -simple or as complex as they wish. For example blocking request-response -vs. asynchronous flows of messages and acknowledgments. - - -## Layout - -This directory is a [Go work-space](http://golang.org/doc/code.html), it is not -yet connected to the rest of the proton build. - -To experiment, install proton in a standard place or set these environment -variables: `PATH`, `C_INCLUDE_PATH`, `LIBRARY_PATH` and `LD_LIBRARY_PATH`. - -Add this directory to `GOPATH` for the Go tools. - -To see the docs as text: - - godoc apache.org/proton - -To see them in your browser run this in the background and open -http://localhost:6060 in your browser: - - godoc -http=:6060 -index=true& - -Click "Packages" and "proton" to see the proton docs. It takes a minute or two -to generate the index so search may not work immediately. - -To run the unit tests: - - go test -a apache.org/proton - -## New to Go? - -If you are new to Go then these are a good place to start: - -- [A Tour of Go](http://tour.golang.org) -- [Effective Go](http://golang.org/doc/effective_go.html) - -Then look at the tools and library docs at <http://golang.org> as you need them. - +Experimental work on the Go language binding has been moved to the `go` branch +until it is ready for use. You can `git checkout go` on your git clone, or +browse at https://github.com/apache/qpid-proton/blob/go/proton-c/bindings/go/README.md http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/WARNING_EXPERIMENTAL ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/WARNING_EXPERIMENTAL b/proton-c/bindings/go/WARNING_EXPERIMENTAL deleted file mode 100644 index 96dc92f..0000000 --- a/proton-c/bindings/go/WARNING_EXPERIMENTAL +++ /dev/null @@ -1 +0,0 @@ -See README.md http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/src/Makefile ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/Makefile b/proton-c/bindings/go/src/Makefile deleted file mode 100644 index 98baa4c..0000000 --- a/proton-c/bindings/go/src/Makefile +++ /dev/null @@ -1,16 +0,0 @@ -# FIXME aconway 2015-04-09: integrate with cmake - -#GOFLAGS=-compiler gccgo -gccgoflags "-g -O0" -#GOFLAGS=-gcflags "-N -l" - -GENERATED=qpid.apache.org/proton/event/wrappers_gen.go - -test: $(GENERATED) - go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton - go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton/event - go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton/messaging - -$(GENERATED): genwrap.go ../../../include/proton/*.h - go run genwrap.go - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/src/genwrap.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/genwrap.go b/proton-c/bindings/go/src/genwrap.go deleted file mode 100644 index e34c045..0000000 --- a/proton-c/bindings/go/src/genwrap.go +++ /dev/null @@ -1,423 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// Code generator to generate a think Go wrapper API around the C proton API. -// - -package main - -import ( - "fmt" - "io" - "io/ioutil" - "os" - "os/exec" - "path" - "regexp" - "strings" - "text/template" -) - -func mixedCase(s string) string { - result := "" - for _, w := range strings.Split(s, "_") { - if w != "" { - result = result + strings.ToUpper(w[0:1]) + strings.ToLower(w[1:]) - } - } - return result -} - -func mixedCaseTrim(s, prefix string) string { - return mixedCase(strings.TrimPrefix(s, prefix)) -} - -var templateFuncs = template.FuncMap{"mixedCase": mixedCase, "mixedCaseTrim": mixedCaseTrim} - -func doTemplate(out io.Writer, data interface{}, tmpl string) { - panicIf(template.Must(template.New("").Funcs(templateFuncs).Parse(tmpl)).Execute(out, data)) -} - -type enumType struct { - Name string - Values []string -} - -// Find enums in a header file return map of enum name to values. -func findEnums(header string) (enums []enumType) { - for _, enum := range enumDefRe.FindAllStringSubmatch(header, -1) { - enums = append(enums, enumType{enum[2], enumValRe.FindAllString(enum[1], -1)}) - } - return enums -} - -func genEnum(out io.Writer, name string, values []string) { - doTemplate(out, []interface{}{name, values}, `{{$enumName := index . 0}}{{$values := index . 1}} -type {{mixedCase $enumName}} C.pn_{{$enumName}}_t -const ({{range $values}} - {{mixedCase .}} {{mixedCase $enumName}} = C.{{.}} {{end}} -) - -func (e {{mixedCase $enumName}}) String() string { - switch e { -{{range $values}} - case C.{{.}}: return "{{mixedCaseTrim . "PN_"}}" {{end}} - } - return "unknown" -} -`) -} - -var ( - reSpace = regexp.MustCompile("\\s+") -) - -func panicIf(err error) { - if err != nil { - panic(err) - } -} - -func readHeader(name string) string { - file, err := os.Open(path.Join("..", "..", "..", "include", "proton", name+".h")) - panicIf(err) - defer file.Close() - s, err := ioutil.ReadAll(file) - panicIf(err) - return string(s) -} - -var copyright string = `/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// -// NOTE: This file was generated by genwrap.go, do not edit it by hand. -// -` - -type eventType struct { - // C, function and interface names for the event - Name, Cname, Fname, Iname string -} - -func newEventType(cName string) eventType { - var etype eventType - etype.Cname = cName - etype.Name = mixedCaseTrim(cName, "PN_") - etype.Fname = "On" + etype.Name - etype.Iname = etype.Fname + "Interface" - return etype -} - -var ( - enumDefRe = regexp.MustCompile("typedef enum {([^}]*)} pn_([a-z_]+)_t;") - enumValRe = regexp.MustCompile("PN_[A-Z_]+") - skipEventRe = regexp.MustCompile("EVENT_NONE|REACTOR|SELECTABLE|TIMER") - skipFnRe = regexp.MustCompile("attach|context|class|collect|^recv$|^send$|transport") -) - -// Generate event wrappers. -func event(out io.Writer) { - event_h := readHeader("event") - - // Event is implented by hand in wrappers.go - - // Get all the pn_event_type_t enum values - var etypes []eventType - enums := findEnums(event_h) - for _, e := range enums[0].Values { - if skipEventRe.FindStringSubmatch(e) == nil { - etypes = append(etypes, newEventType(e)) - } - } - - doTemplate(out, etypes, ` -type EventType int -const ({{range .}} - E{{.Name}} EventType = C.{{.Cname}}{{end}} -) -`) - - doTemplate(out, etypes, ` -func (e EventType) String() string { - switch e { -{{range .}} - case C.{{.Cname}}: return "{{.Name}}"{{end}} - } - return "Unknown" -} -`) -} - -type genType struct { - Ctype, Gotype string - ToGo func(value string) string - ToC func(value string) string - Assign func(value string) string -} - -func (g genType) printBody(out io.Writer, value string) { - if g.Gotype != "" { - fmt.Fprintf(out, "return %s", g.ToGo(value)) - } else { - fmt.Fprintf(out, "%s", value) - } -} - -func (g genType) goLiteral(value string) string { - return fmt.Sprintf("%s{%s}", g.Gotype, value) -} - -func (g genType) goConvert(value string) string { - switch g.Gotype { - case "string": - return fmt.Sprintf("C.GoString(%s)", value) - case "Event": - return fmt.Sprintf("makeEvent(%s)", value) - default: - return fmt.Sprintf("%s(%s)", g.Gotype, value) - } -} - -var notStruct = map[string]bool{ - "EventType": true, - "SndSettleMode": true, - "RcvSettleMode": true, - "TerminusType": true, - "State": true, - "Durability": true, - "ExpiryPolicy": true, - "DistributionMode": true, -} - -func mapType(ctype string) (g genType) { - g.Ctype = "C." + strings.Trim(ctype, " \n") - - switch g.Ctype { - case "C.void": - g.Gotype = "" - case "C.size_t": - g.Gotype = "uint" - case "C.int": - g.Gotype = "int" - case "C.void *": - g.Gotype = "unsafe.Pointer" - g.Ctype = "unsafe.Pointer" - case "C.bool": - g.Gotype = "bool" - case "C.ssize_t": - g.Gotype = "int" - case "C.uint64_t": - g.Gotype = "uint64" - case "C.uint32_t": - g.Gotype = "uint16" - case "C.uint16_t": - g.Gotype = "uint32" - case "C.const char *": - fallthrough - case "C.char *": - g.Gotype = "string" - g.Ctype = "C.CString" - g.ToC = func(v string) string { return fmt.Sprintf("%sC", v) } - g.Assign = func(v string) string { - return fmt.Sprintf("%sC := C.CString(%s)\n defer C.free(unsafe.Pointer(%sC))\n", v, v, v) - } - case "C.pn_seconds_t": - g.Gotype = "time.Duration" - g.ToGo = func(v string) string { return fmt.Sprintf("(time.Duration(%s) * time.Second)", v) } - case "C.pn_error_t *": - g.Gotype = "error" - g.ToGo = func(v string) string { return fmt.Sprintf("internal.PnError(unsafe.Pointer(%s))", v) } - default: - pnId := regexp.MustCompile(" *pn_([a-z_]+)_t *\\*? *") - match := pnId.FindStringSubmatch(g.Ctype) - if match == nil { - panic(fmt.Errorf("unknown C type %#v", g.Ctype)) - } - g.Gotype = mixedCase(match[1]) - if !notStruct[g.Gotype] { - g.ToGo = g.goLiteral - g.ToC = func(v string) string { return v + ".pn" } - } - } - if g.ToGo == nil { - g.ToGo = g.goConvert // Use conversion by default. - } - if g.ToC == nil { - g.ToC = func(v string) string { return fmt.Sprintf("%s(%s)", g.Ctype, v) } - } - return -} - -type genArg struct { - Name string - genType -} - -var typeNameRe = regexp.MustCompile("^(.*( |\\*))([^ *]+)$") - -func splitArgs(argstr string) []genArg { - argstr = strings.Trim(argstr, " \n") - if argstr == "" { - return []genArg{} - } - args := make([]genArg, 0) - for _, item := range strings.Split(argstr, ",") { - item = strings.Trim(item, " \n") - typeName := typeNameRe.FindStringSubmatch(item) - if typeName == nil { - panic(fmt.Errorf("Can't split argument type/name %#v", item)) - } - cType := strings.Trim(typeName[1], " \n") - name := strings.Trim(typeName[3], " \n") - if name == "type" { - name = "type_" - } - args = append(args, genArg{name, mapType(cType)}) - } - return args -} - -func goArgs(args []genArg) string { - l := "" - for i, arg := range args { - if i != 0 { - l += ", " - } - l += arg.Name + " " + arg.Gotype - } - return l -} - -func cArgs(args []genArg) string { - l := "" - for _, arg := range args { - l += fmt.Sprintf(", %s", arg.ToC(arg.Name)) - } - return l -} - -func cAssigns(args []genArg) string { - l := "\n" - for _, arg := range args { - if arg.Assign != nil { - l += fmt.Sprintf("%s\n", arg.Assign(arg.Name)) - } - } - return l -} - -// Return the go name of the function or "" to skip the function. -func goFnName(api, fname string) string { - // Skip class, context and attachment functions. - if skipFnRe.FindStringSubmatch(fname) != nil { - return "" - } - switch api + "." + fname { - case "link.get_drain": - return "IsDrain" - default: - return mixedCaseTrim(fname, "get_") - } -} - -func apiWrapFns(api, header string, out io.Writer) { - fmt.Fprintf(out, "type %s struct{pn *C.pn_%s_t}\n", mixedCase(api), api) - fmt.Fprintf(out, "func (%c %s) IsNil() bool { return %c.pn == nil }\n", api[0], mixedCase(api), api[0]) - fn := regexp.MustCompile(fmt.Sprintf(`PN_EXTERN ([a-z0-9_ ]+ *\*?) *pn_%s_([a-z_]+)\(pn_%s_t *\*[a-z_]+ *,? *([^)]*)\)`, api, api)) - for _, m := range fn.FindAllStringSubmatch(header, -1) { - rtype, fname, argstr := mapType(m[1]), m[2], m[3] - gname := goFnName(api, fname) - if gname == "" { // Skip - continue - } - args := splitArgs(argstr) - fmt.Fprintf(out, "func (%c %s) %s", api[0], mixedCase(api), gname) - fmt.Fprintf(out, "(%s) %s { ", goArgs(args), rtype.Gotype) - fmt.Fprint(out, cAssigns(args)) - rtype.printBody(out, fmt.Sprintf("C.pn_%s_%s(%c.pn%s)", api, fname, api[0], cArgs(args))) - fmt.Fprintf(out, "}\n") - } -} - -func main() { - outpath := path.Join("qpid.apache.org", "proton", "event", "wrappers_gen.go") - out, err := os.Create(outpath) - panicIf(err) - defer out.Close() - - apis := []string{"session", "link", "delivery", "disposition", "condition", "terminus", "connection"} - fmt.Fprintln(out, copyright) - fmt.Fprint(out, ` -package event - -import ( - "time" - "unsafe" - "qpid.apache.org/proton/internal" -) - -// #include <proton/types.h> -// #include <proton/event.h> -// #include <stdlib.h> -`) - for _, api := range apis { - fmt.Fprintf(out, "// #include <proton/%s.h>\n", api) - } - fmt.Fprintln(out, `import "C"`) - - event(out) - - for _, api := range apis { - fmt.Fprintf(out, "// Wrappers for declarations in %s.h\n\n", api) - header := readHeader(api) - enums := findEnums(header) - for _, e := range enums { - genEnum(out, e.Name, e.Values) - } - apiWrapFns(api, header, out) - } - out.Close() - - // Run gofmt. - cmd := exec.Command("gofmt", "-w", outpath) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err = cmd.Run() - if err != nil { - fmt.Fprintf(os.Stderr, "gofmt: %s", err) - os.Exit(1) - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go deleted file mode 100644 index 38c2d00..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/* -Package proton encodes and decodes AMQP messages and data as Go types. - -It follows the standard 'encoding' libraries pattern. The mapping between AMQP -and Go types is described in the documentation of the Marshal and Unmarshal -functions. - -The sub-packages 'event' and 'messaging' provide two alternative ways to write -AMQP clients and servers. 'messaging' is easier for general purpose use. 'event' -gives complete low-level control of the underlying proton C engine. - -AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/> -*/ -package proton - -// #cgo LDFLAGS: -lqpid-proton -import "C" - -// This file is just for the package comment. - -// FIXME aconway 2015-04-28: need to re-organize the package, it's not very intuitive. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go deleted file mode 100644 index a0d45d7..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/* -Package event provides a low-level API to the proton AMQP engine. - -For most tasks, consider instead package qpid.apache.org/proton/messaging. -It provides a higher-level, concurrent API that is easier to use. - -The API is event based. There are two alternative styles of handler. EventHandler -provides the core proton events. MessagingHandler provides a slighly simplified -view of the event stream and automates some common tasks. - -See type Pump documentation for more details of the interaction between proton -events and goroutines. -*/ -package event - -// #cgo LDFLAGS: -lqpid-proton -import "C" - -// This file is just for the package comment. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go deleted file mode 100644 index 5fc679a..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go +++ /dev/null @@ -1,411 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package event - -// #include <proton/reactor.h> -// #include <proton/handlers.h> -import "C" - -import ( - "qpid.apache.org/proton/internal" -) - -// EventHandler handles core proton events. -type EventHandler interface { - // HandleEvent is called with an event. - // Typically HandleEvent() is implemented as a switch on e.Type() - HandleEvent(e Event) error -} - -// cHandler wraps a C pn_handler_t -type cHandler struct { - pn *C.pn_handler_t -} - -func (h cHandler) HandleEvent(e Event) error { - C.pn_handler_dispatch(h.pn, e.pn, C.pn_event_type(e.pn)) - return nil // FIXME aconway 2015-03-31: error handling -} - -// MessagingHandler provides an alternative interface to EventHandler. -// it is easier to use for most applications that send and receive messages. -// -// Implement this interface and then wrap your value with a MessagingHandlerDelegator. -// MessagingHandlerDelegator implements EventHandler and can be registered with a Pump. -// -type MessagingHandler interface { - HandleMessagingEvent(MessagingEventType, Event) error -} - -// MessagingEventType provides a set of events that are easier to work with than the -// core events defined by EventType -// -// There are 3 types of "endpoint": Connection, Session and Link. -// For each endpoint there are 5 event types: Opening, Opened, Closing, Closed and Error. -// The meaning of these events is as follows: -// -// Opening: The remote end opened, the local end will open automatically. -// -// Opened: Both ends are open, regardless of which end opened first. -// -// Closing: The remote end closed without error, the local end will close automatically. -// -// Error: The remote end closed with an error, the local end will close automatically. -// -// Closed: Both ends are closed, regardless of which end closed first or if there was an error. -// -type MessagingEventType int - -const ( - // The event loop starts. - MStart MessagingEventType = iota - // The peer closes the connection with an error condition. - MConnectionError - // The peer closes the session with an error condition. - MSessionError - // The peer closes the link with an error condition. - MLinkError - // The peer Initiates the opening of the connection. - MConnectionOpening - // The peer initiates the opening of the session. - MSessionOpening - // The peer initiates the opening of the link. - MLinkOpening - // The connection is opened. - MConnectionOpened - // The session is opened. - MSessionOpened - // The link is opened. - MLinkOpened - // The peer initiates the closing of the connection. - MConnectionClosing - // The peer initiates the closing of the session. - MSessionClosing - // The peer initiates the closing of the link. - MLinkClosing - // Both ends of the connection are closed. - MConnectionClosed - // Both ends of the session are closed. - MSessionClosed - // Both ends of the link are closed. - MLinkClosed - // The connection is disconnected. - MConnectionDisconnected - // The session's connection was disconnected - MSessionDisconnected - // The session's connection was disconnected - MLinkDisconnected - // The sender link has credit and messages can - // therefore be transferred. - MSendable - // The remote peer accepts an outgoing message. - MAccepted - // The remote peer rejects an outgoing message. - MRejected - // The peer releases an outgoing message. Note that this may be in response to - // either the RELEASE or MODIFIED state as defined by the AMQP specification. - MReleased - // The peer has settled the outgoing message. This is the point at which it - // shouod never be retransmitted. - MSettled - // A message is received. Call proton.EventMessage(Event) to get the message. - // To manage the outcome of this messages (e.g. to accept or reject the message) - // use Event.Delivery(). - MMessage - // The event loop terminates, there are no more events to process. - MFinal -) - -func (t MessagingEventType) String() string { - switch t { - case MStart: - return "Start" - case MConnectionError: - return "ConnectionError" - case MSessionError: - return "SessionError" - case MLinkError: - return "LinkError" - case MConnectionOpening: - return "ConnectionOpening" - case MSessionOpening: - return "SessionOpening" - case MLinkOpening: - return "LinkOpening" - case MConnectionOpened: - return "ConnectionOpened" - case MSessionOpened: - return "SessionOpened" - case MLinkOpened: - return "LinkOpened" - case MConnectionClosing: - return "ConnectionClosing" - case MSessionClosing: - return "SessionClosing" - case MLinkClosing: - return "LinkClosing" - case MConnectionClosed: - return "ConnectionClosed" - case MSessionClosed: - return "SessionClosed" - case MLinkClosed: - return "LinkClosed" - case MConnectionDisconnected: - return "ConnectionDisconnected" - case MSessionDisconnected: - return "MSessionDisconnected" - case MLinkDisconnected: - return "MLinkDisconnected" - case MSendable: - return "Sendable" - case MAccepted: - return "Accepted" - case MRejected: - return "Rejected" - case MReleased: - return "Released" - case MSettled: - return "Settled" - case MMessage: - return "Message" - default: - return "Unknown" - } -} - -// ResourceHandler provides a simple way to track the creation and deletion of -// various proton objects. -// endpointDelegator captures common patterns for endpoints opening/closing -type endpointDelegator struct { - remoteOpen, remoteClose, localOpen, localClose EventType - opening, opened, closing, closed, error MessagingEventType - endpoint func(Event) Endpoint - delegate MessagingHandler -} - -// HandleEvent handles an open/close event for an endpoint in a generic way. -func (d endpointDelegator) HandleEvent(e Event) (err error) { - endpoint := d.endpoint(e) - state := endpoint.State() - - switch e.Type() { - - case d.localOpen: - if state.Is(SRemoteActive) { - err = d.delegate.HandleMessagingEvent(d.opened, e) - } - - case d.remoteOpen: - switch { - case state.Is(SLocalActive): - err = d.delegate.HandleMessagingEvent(d.opened, e) - case state.Is(SLocalUninit): - err = d.delegate.HandleMessagingEvent(d.opening, e) - if err == nil { - endpoint.Open() - } - } - - case d.remoteClose: - var err1 error - if endpoint.RemoteCondition().IsSet() { // Closed with error - err1 = d.delegate.HandleMessagingEvent(d.error, e) - if err1 == nil { // Don't overwrite an application error. - err1 = endpoint.RemoteCondition().Error() - } - } else { - err1 = d.delegate.HandleMessagingEvent(d.closing, e) - } - if state.Is(SLocalClosed) { - err = d.delegate.HandleMessagingEvent(d.closed, e) - } else if state.Is(SLocalActive) { - endpoint.Close() - } - if err1 != nil { // Keep the first error. - err = err1 - } - - case d.localClose: - if state.Is(SRemoteClosed) { - err = d.delegate.HandleMessagingEvent(d.closed, e) - } - - default: - // We shouldn't be called with any other event type. - panic(internal.Errorf("internal error, not an open/close event: %s", e)) - } - - return err -} - -// MessagingDelegator implments a EventHandler and delegates to a MessagingHandler. -// You can modify the exported fields before you pass the MessagingDelegator to -// a Pump. -type MessagingDelegator struct { - delegate MessagingHandler - connection, session, link endpointDelegator - handshaker, flowcontroller EventHandler - - // AutoSettle (default true) automatically pre-settle outgoing messages. - AutoSettle bool - // AutoAccept (default true) automatically accept and settle incoming messages - // if they are not settled by the delegate. - AutoAccept bool - // Prefetch (default 10) initial credit to issue for incoming links. - Prefetch int - // PeerCloseIsError (default false) if true a close by the peer will be treated as an error. - PeerCloseError bool -} - -func NewMessagingDelegator(h MessagingHandler) EventHandler { - return &MessagingDelegator{ - delegate: h, - connection: endpointDelegator{ - EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose, - MConnectionOpening, MConnectionOpened, MConnectionClosing, MConnectionClosed, - MConnectionError, - func(e Event) Endpoint { return e.Connection() }, - h, - }, - session: endpointDelegator{ - ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose, - MSessionOpening, MSessionOpened, MSessionClosing, MSessionClosed, - MSessionError, - func(e Event) Endpoint { return e.Session() }, - h, - }, - link: endpointDelegator{ - ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose, - MLinkOpening, MLinkOpened, MLinkClosing, MLinkClosed, - MLinkError, - func(e Event) Endpoint { return e.Link() }, - h, - }, - flowcontroller: nil, - AutoSettle: true, - AutoAccept: true, - Prefetch: 10, - PeerCloseError: false, - } -} - -func handleIf(h EventHandler, e Event) error { - if h != nil { - return h.HandleEvent(e) - } - return nil -} - -// Handle a proton event by passing the corresponding MessagingEvent(s) to -// the MessagingHandler. -func (d *MessagingDelegator) HandleEvent(e Event) error { - handleIf(d.flowcontroller, e) // FIXME aconway 2015-03-31: error handling. - - switch e.Type() { - - case EConnectionInit: - d.flowcontroller = cHandler{C.pn_flowcontroller(C.int(d.Prefetch))} - d.delegate.HandleMessagingEvent(MStart, e) - - case EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose: - return d.connection.HandleEvent(e) - - case ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose: - return d.session.HandleEvent(e) - - case ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose: - return d.link.HandleEvent(e) - - case ELinkFlow: - if e.Link().IsSender() && e.Link().Credit() > 0 { - return d.delegate.HandleMessagingEvent(MSendable, e) - } - - case EDelivery: - if e.Delivery().Link().IsReceiver() { - d.incoming(e) - } else { - d.outgoing(e) - } - - case ETransportTailClosed: - c := e.Connection() - for l := c.LinkHead(SRemoteActive); !l.IsNil(); l = l.Next(SRemoteActive) { - e2 := e - e2.link = l - e2.session = l.Session() - d.delegate.HandleMessagingEvent(MLinkDisconnected, e2) - } - for s := c.SessionHead(SRemoteActive); !s.IsNil(); s = s.Next(SRemoteActive) { - e2 := e - e2.session = s - d.delegate.HandleMessagingEvent(MSessionDisconnected, e2) - } - d.delegate.HandleMessagingEvent(MConnectionDisconnected, e) - d.delegate.HandleMessagingEvent(MFinal, e) - } - return nil -} - -func (d *MessagingDelegator) incoming(e Event) (err error) { - delivery := e.Delivery() - if delivery.Readable() && !delivery.Partial() { - if e.Link().State().Is(SLocalClosed) { - e.Link().Advance() - if d.AutoAccept { - delivery.Release(false) - } - } else { - err = d.delegate.HandleMessagingEvent(MMessage, e) - e.Link().Advance() - if d.AutoAccept && !delivery.Settled() { - if err == nil { - delivery.Accept() - } else { - delivery.Reject() - } - } - } - } else if delivery.Updated() && delivery.Settled() { - err = d.delegate.HandleMessagingEvent(MSettled, e) - } - return -} - -func (d *MessagingDelegator) outgoing(e Event) (err error) { - delivery := e.Delivery() - if delivery.Updated() { - switch delivery.Remote().Type() { - case Accepted: - err = d.delegate.HandleMessagingEvent(MAccepted, e) - case Rejected: - err = d.delegate.HandleMessagingEvent(MRejected, e) - case Released, Modified: - err = d.delegate.HandleMessagingEvent(MReleased, e) - } - if err == nil && delivery.Settled() { - err = d.delegate.HandleMessagingEvent(MSettled, e) - } - if err == nil && d.AutoSettle { - delivery.Settle() - } - } - return -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go deleted file mode 100644 index bd7dddd..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package event - -// #include <proton/types.h> -// #include <proton/message.h> -// #include <proton/codec.h> -import "C" - -import ( - "qpid.apache.org/proton" - "qpid.apache.org/proton/internal" -) - -// DecodeMessage decodes the message containined in a delivery event. -func DecodeMessage(e Event) (m proton.Message, err error) { - defer internal.DoRecover(&err) - delivery := e.Delivery() - if !delivery.Readable() || delivery.Partial() { - return nil, internal.Errorf("attempting to get incomplete message") - } - data := make([]byte, delivery.Pending()) - result := delivery.Link().Recv(data) - if result != len(data) { - return nil, internal.Errorf("cannot receive message: %s", internal.PnErrorCode(result)) - } - return proton.DecodeMessage(data) -} - -// FIXME aconway 2015-04-08: proper handling of delivery tags. Tag counter per link. -var tags proton.UidCounter - -// Send sends a proton.Message over a Link. -// Returns a Delivery that can be use to determine the outcome of the message. -func (link Link) Send(m proton.Message) (Delivery, error) { - if !link.IsSender() { - return Delivery{}, internal.Errorf("attempt to send message on receiving link") - } - // FIXME aconway 2015-04-08: buffering, error handling - delivery := link.Delivery(tags.Next()) - bytes, err := m.Encode(nil) - if err != nil { - return Delivery{}, internal.Errorf("cannot send mesage %s", err) - } - result := link.SendBytes(bytes) - link.Advance() - if result != len(bytes) { - if result < 0 { - return delivery, internal.Errorf("send failed %v", internal.PnErrorCode(result)) - } else { - return delivery, internal.Errorf("send incomplete %v of %v", result, len(bytes)) - } - } - if link.RemoteSndSettleMode() == PnSndSettled { // FIXME aconway 2015-04-08: enum names - delivery.Settle() - } - return delivery, nil -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go deleted file mode 100644 index c9c5ca3..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go +++ /dev/null @@ -1,357 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package event - -// #include <proton/connection.h> -// #include <proton/transport.h> -// #include <proton/event.h> -// #include <proton/reactor.h> -// #include <proton/handlers.h> -// #include <proton/transport.h> -// #include <proton/session.h> -// #include <memory.h> -// #include <stdlib.h> -// -// PN_HANDLE(REMOTE_ADDR) -import "C" - -import ( - "fmt" - "io" - "net" - "qpid.apache.org/proton/internal" - "sync" - "unsafe" -) - -// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel. -type bufferChan struct { - buffers chan []byte - buf1, buf2 []byte -} - -func newBufferChan(size int) *bufferChan { - return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)} -} - -func (b *bufferChan) buffer() []byte { - b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers. - return b.buf1[:cap(b.buf1)] -} - -// FIXME aconway 2015-05-04: direct sending to Inject may block user goroutines if -// the pum stops. Make this a function that selects on running. - -// FIXME aconway 2015-05-05: for consistency should Pump be called Driver? - -/* -Pump reads from a net.Conn, decodes AMQP events and calls the appropriate -Handler functions. Actions taken by Handler functions (such as sending messages) -are encoded and written to the net.Conn. - -The proton protocol engine is single threaded (per connection). The Pump runs -proton in the goroutine that calls Pump.Run() and creates goroutines to feed -data to/from a net.Conn. You can create multiple Pumps to handle multiple -connections concurrently. - -Methods in this package can only be called in the goroutine that executes the -corresponding Pump.Run(). You implement the EventHandler or MessagingHandler -interfaces and provide those values to NewPump(). Their HandleEvent method will be -called in the Pump goroutine, in typical event-driven style. - -Handlers can pass values from an event (Connections, Links, Deliveries etc.) to -other goroutines, store them, or use them as map indexes. Effectively they are -just C pointers. Other goroutines cannot call their methods directly but they -can can create function closures that call their methods and send those closures -to the Pump.Inject channel. They will execute safely in the pump -goroutine. Injected functions, or your handlers, can set up channels to get -results back to other goroutines. - -You are responsible for ensuring you don't use an event value after the C object -has been deleted. The handler methods will tell you when a value is no longer -valid. For example after a MethodHandler handles a LinkClosed event, that link -is no longer valid. If you do Link.Close() yourself (in a handler or injected -function) the link remains valid until the corresponing LinkClosed event is -received by the handler. - -Pump.Close() will take care of cleaning up any remaining values and types when -you are done with the Pump. All values associated with a pump become invalid -when you call Pump.Close() - -The qpid.apache.org/proton/messaging package will do all this for you, so unless -you are doing something fairly low-level it is probably a better choice. - -*/ -type Pump struct { - // Error is set on exit from Run() if there was an error. - Error error - // Channel to inject functions to be executed in the Pump's proton event loop. - Inject chan func() - - conn net.Conn - transport *C.pn_transport_t - connection *C.pn_connection_t - collector *C.pn_collector_t - read *bufferChan // Read buffers channel. - write *bufferChan // Write buffers channel. - handlers []EventHandler // Handlers for proton events. - running chan struct{} // This channel will be closed when the goroutines are done. -} - -const bufferSize = 4096 - -var pumps map[*C.pn_connection_t]*Pump - -func init() { - pumps = make(map[*C.pn_connection_t]*Pump) -} - -// NewPump initializes a pump with a connection and handlers. To start it running: -// p := NewPump(...) -// go run p.Run() -// The goroutine will exit when the pump is closed or disconnected. -// You can check for errors on Pump.Error. -// -func NewPump(conn net.Conn, handlers ...EventHandler) (*Pump, error) { - // Save the connection ID for Connection.String() - p := &Pump{ - Inject: make(chan func(), 100), // FIXME aconway 2015-05-04: blocking hack - conn: conn, - transport: C.pn_transport(), - connection: C.pn_connection(), - collector: C.pn_collector(), - handlers: handlers, - read: newBufferChan(bufferSize), - write: newBufferChan(bufferSize), - running: make(chan struct{}), - } - if p.transport == nil || p.connection == nil || p.collector == nil { - return nil, internal.Errorf("failed to allocate pump") - } - pnErr := int(C.pn_transport_bind(p.transport, p.connection)) - if pnErr != 0 { - return nil, internal.Errorf("cannot setup pump: %s", internal.PnErrorCode(pnErr)) - } - C.pn_connection_collect(p.connection, p.collector) - C.pn_connection_open(p.connection) - pumps[p.connection] = p - return p, nil -} - -func (p *Pump) String() string { - return fmt.Sprintf("(%s-%s)", p.conn.LocalAddr(), p.conn.RemoteAddr()) -} - -func (p *Pump) Id() string { - return fmt.Sprintf("%p", &p) -} - -// setError sets error only if not already set -func (p *Pump) setError(e error) { - if p.Error == nil { - p.Error = e - } -} - -// Server puts the Pump in server mode, meaning it will auto-detect security settings on -// the incoming connnection such as use of SASL and SSL. -// Must be called before Run() -// -func (p *Pump) Server() { - C.pn_transport_set_server(p.transport) -} - -func (p *Pump) free() { - if p.connection != nil { - C.pn_connection_free(p.connection) - } - if p.transport != nil { - C.pn_transport_free(p.transport) - } - if p.collector != nil { - C.pn_collector_free(p.collector) - } - for _, h := range p.handlers { - switch h := h.(type) { - case cHandler: - C.pn_handler_free(h.pn) - } - } -} - -// Close closes the AMQP connection, the net.Conn, and stops associated goroutines. -// It will cause Run() to return. Run() may return earlier if the network disconnects -// but you must still call Close() to clean everything up. -// -// Methods on values associated with the pump (Connections, Sessions, Links) will panic -// if called after Close() -// -func (p *Pump) Close() error { - // If the pump is still running, inject a close. Either way wait for it to finish. - select { - case p.Inject <- func() { C.pn_connection_close(p.connection) }: - <-p.running // Wait to finish - case <-p.running: // Wait for goroutines to finish - } - delete(pumps, p.connection) - p.free() - return p.Error -} - -// Run the pump. Normally called in a goroutine as: go pump.Run() -// An error dunring Run is stored on p.Error. -// -func (p *Pump) Run() { - // Signal errors from the read/write goroutines. Don't block if we don't - // read all the errors, we only care about the first. - error := make(chan error, 2) - // FIXME aconway 2015-05-04: stop := make(chan struct{}) // Closed to signal that read/write should stop. - - wait := sync.WaitGroup{} - wait.Add(2) - - go func() { // Read goroutine - defer wait.Done() - for { - rbuf := p.read.buffer() - n, err := p.conn.Read(rbuf) - if n > 0 { - p.read.buffers <- rbuf[:n] - } else if err != nil { - close(p.read.buffers) - error <- err - return - } - } - }() - - go func() { // Write goroutine - defer wait.Done() - for { - wbuf, ok := <-p.write.buffers - if !ok { - return - } - _, err := p.conn.Write(wbuf) - if err != nil { - error <- err - return - } - } - }() - - wbuf := p.write.buffer()[:0] -loop: - for { - if len(wbuf) == 0 { - p.pop(&wbuf) - } - // Don't set wchan unless there is something to write. - var wchan chan []byte - if len(wbuf) > 0 { - wchan = p.write.buffers - } - - select { - case buf := <-p.read.buffers: // Read a buffer - p.push(buf) - case wchan <- wbuf: // Write a buffer - wbuf = p.write.buffer()[:0] - case f := <-p.Inject: // Function injected from another goroutine - f() - case err := <-error: // Read or write error - p.setError(err) - C.pn_transport_close_tail(p.transport) - C.pn_transport_close_head(p.transport) - } - if err := p.process(); err != nil { - p.setError(err) - break loop - } - } - close(p.write.buffers) - p.conn.Close() - wait.Wait() - close(p.running) // Signal goroutines have exited and Error is set. -} - -func minInt(a, b int) int { - if a < b { - return a - } else { - return b - } -} - -func (p *Pump) pop(buf *[]byte) { - pending := int(C.pn_transport_pending(p.transport)) - switch { - case pending == int(C.PN_EOS): - *buf = (*buf)[:] - return - case pending < 0: - panic(internal.Errorf("%s", internal.PnErrorCode(pending))) - } - size := minInt(pending, cap(*buf)) - *buf = (*buf)[:size] - if size == 0 { - return - } - C.memcpy(unsafe.Pointer(&(*buf)[0]), unsafe.Pointer(C.pn_transport_head(p.transport)), C.size_t(size)) - C.pn_transport_pop(p.transport, C.size_t(size)) -} - -func (p *Pump) push(buf []byte) { - buf2 := buf - for len(buf2) > 0 { - n := int(C.pn_transport_push(p.transport, (*C.char)(unsafe.Pointer((&buf2[0]))), C.size_t(len(buf2)))) - if n <= 0 { - panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n))) - } - buf2 = buf2[n:] - } -} - -func (p *Pump) handle(e Event) error { - for _, h := range p.handlers { - if err := h.HandleEvent(e); err != nil { - return err - } - } - if e.Type() == ETransportClosed { - return io.EOF - } - return nil -} - -func (p *Pump) process() error { - // FIXME aconway 2015-05-04: if a Handler returns error we should stop the pump - for ce := C.pn_collector_peek(p.collector); ce != nil; ce = C.pn_collector_peek(p.collector) { - e := makeEvent(ce) - if err := p.handle(e); err != nil { - return err - } - C.pn_collector_pop(p.collector) - } - return nil -} - -// Connectoin gets the Pump's connection value. -func (p *Pump) Connection() Connection { return Connection{p.connection} } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go deleted file mode 100644 index d2c4e43..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go +++ /dev/null @@ -1,253 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package event - -//#include <proton/codec.h> -//#include <proton/connection.h> -//#include <proton/session.h> -//#include <proton/session.h> -//#include <proton/delivery.h> -//#include <proton/link.h> -//#include <proton/event.h> -//#include <proton/transport.h> -//#include <proton/link.h> -//#include <stdlib.h> -import "C" - -import ( - "fmt" - "qpid.apache.org/proton/internal" - "unsafe" -) - -// FIXME aconway 2015-05-05: Documentation for generated types. - -// Event is an AMQP protocol event. -type Event struct { - pn *C.pn_event_t - eventType EventType - connection Connection - session Session - link Link - delivery Delivery -} - -func makeEvent(pn *C.pn_event_t) Event { - return Event{ - pn: pn, - eventType: EventType(C.pn_event_type(pn)), - connection: Connection{C.pn_event_connection(pn)}, - session: Session{C.pn_event_session(pn)}, - link: Link{C.pn_event_link(pn)}, - delivery: Delivery{C.pn_event_delivery(pn)}, - } -} -func (e Event) IsNil() bool { return e.eventType == EventType(0) } -func (e Event) Type() EventType { return e.eventType } -func (e Event) Connection() Connection { return e.connection } -func (e Event) Session() Session { return e.session } -func (e Event) Link() Link { return e.link } -func (e Event) Delivery() Delivery { return e.delivery } -func (e Event) String() string { return e.Type().String() } - -// Data holds a pointer to decoded AMQP data. -// Use proton.marshal/unmarshal to access it as Go data types. -// -type Data struct{ pn *C.pn_data_t } - -func NewData(p unsafe.Pointer) Data { return Data{(*C.pn_data_t)(p)} } - -func (d Data) Free() { C.pn_data_free(d.pn) } -func (d Data) Pointer() unsafe.Pointer { return unsafe.Pointer(d.pn) } -func (d Data) Clear() { C.pn_data_clear(d.pn) } -func (d Data) Rewind() { C.pn_data_rewind(d.pn) } -func (d Data) Error() error { - return internal.PnError(unsafe.Pointer(C.pn_data_error(d.pn))) -} - -// State holds the state flags for an AMQP endpoint. -type State byte - -const ( - SLocalUninit State = C.PN_LOCAL_UNINIT - SLocalActive = C.PN_LOCAL_ACTIVE - SLocalClosed = C.PN_LOCAL_CLOSED - SRemoteUninit = C.PN_REMOTE_UNINIT - SRemoteActive = C.PN_REMOTE_ACTIVE - SRemoteClosed = C.PN_REMOTE_CLOSED -) - -// Is is True if bits & state is non 0. -func (s State) Is(bits State) bool { return s&bits != 0 } - -// Return a State containig just the local flags -func (s State) Local() State { return State(s & C.PN_LOCAL_MASK) } - -// Return a State containig just the remote flags -func (s State) Remote() State { return State(s & C.PN_REMOTE_MASK) } - -// Endpoint is the common interface for Connection, Link and Session. -type Endpoint interface { - // State is the open/closed state. - State() State - // Open an endpoint. - Open() - // Close an endpoint. - Close() - // Condition holds a local error condition. - Condition() Condition - // RemoteCondition holds a remote error condition. - RemoteCondition() Condition -} - -const ( - Received uint64 = C.PN_RECEIVED - Accepted = C.PN_ACCEPTED - Rejected = C.PN_REJECTED - Released = C.PN_RELEASED - Modified = C.PN_MODIFIED -) - -// SettleAs is equivalent to d.Update(disposition); d.Settle() -// It is a no-op if e does not have a delivery. -func (d Delivery) SettleAs(disposition uint64) { - d.Update(disposition) - d.Settle() -} - -// Accept accepts and settles a delivery. -func (d Delivery) Accept() { d.SettleAs(Accepted) } - -// Reject rejects and settles a delivery -func (d Delivery) Reject() { d.SettleAs(Rejected) } - -// Release releases and settles a delivery -// If delivered is true the delivery count for the message will be increased. -func (d Delivery) Release(delivered bool) { - if delivered { - d.SettleAs(Modified) - } else { - d.SettleAs(Released) - } -} - -// FIXME aconway 2015-05-05: don't expose DeliveryTag as a C pointer, just as a String? - -type DeliveryTag struct{ pn C.pn_delivery_tag_t } - -func (t DeliveryTag) String() string { return C.GoStringN(t.pn.start, C.int(t.pn.size)) } - -func (l Link) Recv(buf []byte) int { - if len(buf) == 0 { - return 0 - } - return int(C.pn_link_recv(l.pn, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) -} - -func (l Link) SendBytes(bytes []byte) int { - return int(C.pn_link_send(l.pn, cPtr(bytes), cLen(bytes))) -} - -func pnTag(tag string) C.pn_delivery_tag_t { - bytes := []byte(tag) - return C.pn_dtag(cPtr(bytes), cLen(bytes)) -} - -func (l Link) Delivery(tag string) Delivery { - return Delivery{C.pn_delivery(l.pn, pnTag(tag))} -} - -func cPtr(b []byte) *C.char { - if len(b) == 0 { - return nil - } - return (*C.char)(unsafe.Pointer(&b[0])) -} - -func cLen(b []byte) C.size_t { - return C.size_t(len(b)) -} - -func (s Session) Sender(name string) Link { - cname := C.CString(name) - defer C.free(unsafe.Pointer(cname)) - return Link{C.pn_sender(s.pn, cname)} -} - -func (s Session) Receiver(name string) Link { - cname := C.CString(name) - defer C.free(unsafe.Pointer(cname)) - return Link{C.pn_receiver(s.pn, cname)} -} - -func joinId(a, b interface{}) string { - return fmt.Sprintf("%s/%s", a, b) -} - -// Pump associated with this connection. -func (c Connection) Pump() *Pump { return pumps[c.pn] } - -// Unique (per process) string identifier for a connection, useful for debugging. -func (c Connection) String() string { return pumps[c.pn].String() } - -// Head functions don't follow the normal naming conventions so missed by the generator. - -func (c Connection) LinkHead(s State) Link { - return Link{C.pn_link_head(c.pn, C.pn_state_t(s))} -} - -func (c Connection) SessionHead(s State) Session { - return Session{C.pn_session_head(c.pn, C.pn_state_t(s))} -} - -// Unique (per process) string identifier for a session, including connection identifier. -func (s Session) String() string { - return joinId(s.Connection(), fmt.Sprintf("%p", s.pn)) -} - -// Unique (per process) string identifier for a link, inlcuding session identifier. -func (l Link) String() string { - return joinId(l.Session(), l.Name()) -} - -// Error returns an error interface corresponding to Condition. -func (c Condition) Error() error { - if c.IsNil() { - return nil - } else { - return fmt.Errorf("%s: %s", c.Name(), c.Description()) - } -} - -// SetIfUnset sets name and description on a condition if it is not already set. -func (c Condition) SetIfUnset(name, description string) { - if !c.IsSet() { - c.SetName(name) - c.SetDescription(description) - } -} - -func (c Connection) Session() (Session, error) { - s := Session{C.pn_session(c.pn)} - if s.IsNil() { - return s, Connection(c).Error() - } - return s, nil -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go deleted file mode 100644 index f53e8bb..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go +++ /dev/null @@ -1,732 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// -// NOTE: This file was generated by genwrap.go, do not edit it by hand. -// - -package event - -import ( - "qpid.apache.org/proton/internal" - "time" - "unsafe" -) - -// #include <proton/types.h> -// #include <proton/event.h> -// #include <stdlib.h> -// #include <proton/session.h> -// #include <proton/link.h> -// #include <proton/delivery.h> -// #include <proton/disposition.h> -// #include <proton/condition.h> -// #include <proton/terminus.h> -// #include <proton/connection.h> -import "C" - -type EventType int - -const ( - EConnectionInit EventType = C.PN_CONNECTION_INIT - EConnectionBound EventType = C.PN_CONNECTION_BOUND - EConnectionUnbound EventType = C.PN_CONNECTION_UNBOUND - EConnectionLocalOpen EventType = C.PN_CONNECTION_LOCAL_OPEN - EConnectionRemoteOpen EventType = C.PN_CONNECTION_REMOTE_OPEN - EConnectionLocalClose EventType = C.PN_CONNECTION_LOCAL_CLOSE - EConnectionRemoteClose EventType = C.PN_CONNECTION_REMOTE_CLOSE - EConnectionFinal EventType = C.PN_CONNECTION_FINAL - ESessionInit EventType = C.PN_SESSION_INIT - ESessionLocalOpen EventType = C.PN_SESSION_LOCAL_OPEN - ESessionRemoteOpen EventType = C.PN_SESSION_REMOTE_OPEN - ESessionLocalClose EventType = C.PN_SESSION_LOCAL_CLOSE - ESessionRemoteClose EventType = C.PN_SESSION_REMOTE_CLOSE - ESessionFinal EventType = C.PN_SESSION_FINAL - ELinkInit EventType = C.PN_LINK_INIT - ELinkLocalOpen EventType = C.PN_LINK_LOCAL_OPEN - ELinkRemoteOpen EventType = C.PN_LINK_REMOTE_OPEN - ELinkLocalClose EventType = C.PN_LINK_LOCAL_CLOSE - ELinkRemoteClose EventType = C.PN_LINK_REMOTE_CLOSE - ELinkLocalDetach EventType = C.PN_LINK_LOCAL_DETACH - ELinkRemoteDetach EventType = C.PN_LINK_REMOTE_DETACH - ELinkFlow EventType = C.PN_LINK_FLOW - ELinkFinal EventType = C.PN_LINK_FINAL - EDelivery EventType = C.PN_DELIVERY - ETransport EventType = C.PN_TRANSPORT - ETransportAuthenticated EventType = C.PN_TRANSPORT_AUTHENTICATED - ETransportError EventType = C.PN_TRANSPORT_ERROR - ETransportHeadClosed EventType = C.PN_TRANSPORT_HEAD_CLOSED - ETransportTailClosed EventType = C.PN_TRANSPORT_TAIL_CLOSED - ETransportClosed EventType = C.PN_TRANSPORT_CLOSED -) - -func (e EventType) String() string { - switch e { - - case C.PN_CONNECTION_INIT: - return "ConnectionInit" - case C.PN_CONNECTION_BOUND: - return "ConnectionBound" - case C.PN_CONNECTION_UNBOUND: - return "ConnectionUnbound" - case C.PN_CONNECTION_LOCAL_OPEN: - return "ConnectionLocalOpen" - case C.PN_CONNECTION_REMOTE_OPEN: - return "ConnectionRemoteOpen" - case C.PN_CONNECTION_LOCAL_CLOSE: - return "ConnectionLocalClose" - case C.PN_CONNECTION_REMOTE_CLOSE: - return "ConnectionRemoteClose" - case C.PN_CONNECTION_FINAL: - return "ConnectionFinal" - case C.PN_SESSION_INIT: - return "SessionInit" - case C.PN_SESSION_LOCAL_OPEN: - return "SessionLocalOpen" - case C.PN_SESSION_REMOTE_OPEN: - return "SessionRemoteOpen" - case C.PN_SESSION_LOCAL_CLOSE: - return "SessionLocalClose" - case C.PN_SESSION_REMOTE_CLOSE: - return "SessionRemoteClose" - case C.PN_SESSION_FINAL: - return "SessionFinal" - case C.PN_LINK_INIT: - return "LinkInit" - case C.PN_LINK_LOCAL_OPEN: - return "LinkLocalOpen" - case C.PN_LINK_REMOTE_OPEN: - return "LinkRemoteOpen" - case C.PN_LINK_LOCAL_CLOSE: - return "LinkLocalClose" - case C.PN_LINK_REMOTE_CLOSE: - return "LinkRemoteClose" - case C.PN_LINK_LOCAL_DETACH: - return "LinkLocalDetach" - case C.PN_LINK_REMOTE_DETACH: - return "LinkRemoteDetach" - case C.PN_LINK_FLOW: - return "LinkFlow" - case C.PN_LINK_FINAL: - return "LinkFinal" - case C.PN_DELIVERY: - return "Delivery" - case C.PN_TRANSPORT: - return "Transport" - case C.PN_TRANSPORT_AUTHENTICATED: - return "TransportAuthenticated" - case C.PN_TRANSPORT_ERROR: - return "TransportError" - case C.PN_TRANSPORT_HEAD_CLOSED: - return "TransportHeadClosed" - case C.PN_TRANSPORT_TAIL_CLOSED: - return "TransportTailClosed" - case C.PN_TRANSPORT_CLOSED: - return "TransportClosed" - } - return "Unknown" -} - -// Wrappers for declarations in session.h - -type Session struct{ pn *C.pn_session_t } - -func (s Session) IsNil() bool { return s.pn == nil } -func (s Session) Free() { - C.pn_session_free(s.pn) -} -func (s Session) State() State { - return State(C.pn_session_state(s.pn)) -} -func (s Session) Error() error { - return internal.PnError(unsafe.Pointer(C.pn_session_error(s.pn))) -} -func (s Session) Condition() Condition { - return Condition{C.pn_session_condition(s.pn)} -} -func (s Session) RemoteCondition() Condition { - return Condition{C.pn_session_remote_condition(s.pn)} -} -func (s Session) Connection() Connection { - return Connection{C.pn_session_connection(s.pn)} -} -func (s Session) Open() { - C.pn_session_open(s.pn) -} -func (s Session) Close() { - C.pn_session_close(s.pn) -} -func (s Session) IncomingCapacity() uint { - return uint(C.pn_session_get_incoming_capacity(s.pn)) -} -func (s Session) SetIncomingCapacity(capacity uint) { - C.pn_session_set_incoming_capacity(s.pn, C.size_t(capacity)) -} -func (s Session) OutgoingBytes() uint { - return uint(C.pn_session_outgoing_bytes(s.pn)) -} -func (s Session) IncomingBytes() uint { - return uint(C.pn_session_incoming_bytes(s.pn)) -} -func (s Session) Next(state State) Session { - return Session{C.pn_session_next(s.pn, C.pn_state_t(state))} -} - -// Wrappers for declarations in link.h - -type SndSettleMode C.pn_snd_settle_mode_t - -const ( - PnSndUnsettled SndSettleMode = C.PN_SND_UNSETTLED - PnSndSettled SndSettleMode = C.PN_SND_SETTLED - PnSndMixed SndSettleMode = C.PN_SND_MIXED -) - -func (e SndSettleMode) String() string { - switch e { - - case C.PN_SND_UNSETTLED: - return "SndUnsettled" - case C.PN_SND_SETTLED: - return "SndSettled" - case C.PN_SND_MIXED: - return "SndMixed" - } - return "unknown" -} - -type RcvSettleMode C.pn_rcv_settle_mode_t - -const ( - PnRcvFirst RcvSettleMode = C.PN_RCV_FIRST - PnRcvSecond RcvSettleMode = C.PN_RCV_SECOND -) - -func (e RcvSettleMode) String() string { - switch e { - - case C.PN_RCV_FIRST: - return "RcvFirst" - case C.PN_RCV_SECOND: - return "RcvSecond" - } - return "unknown" -} - -type Link struct{ pn *C.pn_link_t } - -func (l Link) IsNil() bool { return l.pn == nil } -func (l Link) Free() { - C.pn_link_free(l.pn) -} -func (l Link) Name() string { - return C.GoString(C.pn_link_name(l.pn)) -} -func (l Link) IsSender() bool { - return bool(C.pn_link_is_sender(l.pn)) -} -func (l Link) IsReceiver() bool { - return bool(C.pn_link_is_receiver(l.pn)) -} -func (l Link) State() State { - return State(C.pn_link_state(l.pn)) -} -func (l Link) Error() error { - return internal.PnError(unsafe.Pointer(C.pn_link_error(l.pn))) -} -func (l Link) Condition() Condition { - return Condition{C.pn_link_condition(l.pn)} -} -func (l Link) RemoteCondition() Condition { - return Condition{C.pn_link_remote_condition(l.pn)} -} -func (l Link) Session() Session { - return Session{C.pn_link_session(l.pn)} -} -func (l Link) Next(state State) Link { - return Link{C.pn_link_next(l.pn, C.pn_state_t(state))} -} -func (l Link) Open() { - C.pn_link_open(l.pn) -} -func (l Link) Close() { - C.pn_link_close(l.pn) -} -func (l Link) Detach() { - C.pn_link_detach(l.pn) -} -func (l Link) Source() Terminus { - return Terminus{C.pn_link_source(l.pn)} -} -func (l Link) Target() Terminus { - return Terminus{C.pn_link_target(l.pn)} -} -func (l Link) RemoteSource() Terminus { - return Terminus{C.pn_link_remote_source(l.pn)} -} -func (l Link) RemoteTarget() Terminus { - return Terminus{C.pn_link_remote_target(l.pn)} -} -func (l Link) Current() Delivery { - return Delivery{C.pn_link_current(l.pn)} -} -func (l Link) Advance() bool { - return bool(C.pn_link_advance(l.pn)) -} -func (l Link) Credit() int { - return int(C.pn_link_credit(l.pn)) -} -func (l Link) Queued() int { - return int(C.pn_link_queued(l.pn)) -} -func (l Link) RemoteCredit() int { - return int(C.pn_link_remote_credit(l.pn)) -} -func (l Link) IsDrain() bool { - return bool(C.pn_link_get_drain(l.pn)) -} -func (l Link) Drained() int { - return int(C.pn_link_drained(l.pn)) -} -func (l Link) Available() int { - return int(C.pn_link_available(l.pn)) -} -func (l Link) SndSettleMode() SndSettleMode { - return SndSettleMode(C.pn_link_snd_settle_mode(l.pn)) -} -func (l Link) RcvSettleMode() RcvSettleMode { - return RcvSettleMode(C.pn_link_rcv_settle_mode(l.pn)) -} -func (l Link) SetSndSettleMode(mode SndSettleMode) { - C.pn_link_set_snd_settle_mode(l.pn, C.pn_snd_settle_mode_t(mode)) -} -func (l Link) SetRcvSettleMode(mode RcvSettleMode) { - C.pn_link_set_rcv_settle_mode(l.pn, C.pn_rcv_settle_mode_t(mode)) -} -func (l Link) RemoteSndSettleMode() SndSettleMode { - return SndSettleMode(C.pn_link_remote_snd_settle_mode(l.pn)) -} -func (l Link) RemoteRcvSettleMode() RcvSettleMode { - return RcvSettleMode(C.pn_link_remote_rcv_settle_mode(l.pn)) -} -func (l Link) Unsettled() int { - return int(C.pn_link_unsettled(l.pn)) -} -func (l Link) Offered(credit int) { - C.pn_link_offered(l.pn, C.int(credit)) -} -func (l Link) Flow(credit int) { - C.pn_link_flow(l.pn, C.int(credit)) -} -func (l Link) Drain(credit int) { - C.pn_link_drain(l.pn, C.int(credit)) -} -func (l Link) SetDrain(drain bool) { - C.pn_link_set_drain(l.pn, C.bool(drain)) -} -func (l Link) Draining() bool { - return bool(C.pn_link_draining(l.pn)) -} - -// Wrappers for declarations in delivery.h - -type Delivery struct{ pn *C.pn_delivery_t } - -func (d Delivery) IsNil() bool { return d.pn == nil } -func (d Delivery) Tag() DeliveryTag { - return DeliveryTag{C.pn_delivery_tag(d.pn)} -} -func (d Delivery) Link() Link { - return Link{C.pn_delivery_link(d.pn)} -} -func (d Delivery) Local() Disposition { - return Disposition{C.pn_delivery_local(d.pn)} -} -func (d Delivery) LocalState() uint64 { - return uint64(C.pn_delivery_local_state(d.pn)) -} -func (d Delivery) Remote() Disposition { - return Disposition{C.pn_delivery_remote(d.pn)} -} -func (d Delivery) RemoteState() uint64 { - return uint64(C.pn_delivery_remote_state(d.pn)) -} -func (d Delivery) Settled() bool { - return bool(C.pn_delivery_settled(d.pn)) -} -func (d Delivery) Pending() uint { - return uint(C.pn_delivery_pending(d.pn)) -} -func (d Delivery) Partial() bool { - return bool(C.pn_delivery_partial(d.pn)) -} -func (d Delivery) Writable() bool { - return bool(C.pn_delivery_writable(d.pn)) -} -func (d Delivery) Readable() bool { - return bool(C.pn_delivery_readable(d.pn)) -} -func (d Delivery) Updated() bool { - return bool(C.pn_delivery_updated(d.pn)) -} -func (d Delivery) Update(state uint64) { - C.pn_delivery_update(d.pn, C.uint64_t(state)) -} -func (d Delivery) Clear() { - C.pn_delivery_clear(d.pn) -} -func (d Delivery) Current() bool { - return bool(C.pn_delivery_current(d.pn)) -} -func (d Delivery) Settle() { - C.pn_delivery_settle(d.pn) -} -func (d Delivery) Dump() { - C.pn_delivery_dump(d.pn) -} -func (d Delivery) Buffered() bool { - return bool(C.pn_delivery_buffered(d.pn)) -} - -// Wrappers for declarations in disposition.h - -type Disposition struct{ pn *C.pn_disposition_t } - -func (d Disposition) IsNil() bool { return d.pn == nil } -func (d Disposition) Type() uint64 { - return uint64(C.pn_disposition_type(d.pn)) -} -func (d Disposition) Condition() Condition { - return Condition{C.pn_disposition_condition(d.pn)} -} -func (d Disposition) Data() Data { - return Data{C.pn_disposition_data(d.pn)} -} -func (d Disposition) SectionNumber() uint16 { - return uint16(C.pn_disposition_get_section_number(d.pn)) -} -func (d Disposition) SetSectionNumber(section_number uint16) { - C.pn_disposition_set_section_number(d.pn, C.uint32_t(section_number)) -} -func (d Disposition) SectionOffset() uint64 { - return uint64(C.pn_disposition_get_section_offset(d.pn)) -} -func (d Disposition) SetSectionOffset(section_offset uint64) { - C.pn_disposition_set_section_offset(d.pn, C.uint64_t(section_offset)) -} -func (d Disposition) IsFailed() bool { - return bool(C.pn_disposition_is_failed(d.pn)) -} -func (d Disposition) SetFailed(failed bool) { - C.pn_disposition_set_failed(d.pn, C.bool(failed)) -} -func (d Disposition) IsUndeliverable() bool { - return bool(C.pn_disposition_is_undeliverable(d.pn)) -} -func (d Disposition) SetUndeliverable(undeliverable bool) { - C.pn_disposition_set_undeliverable(d.pn, C.bool(undeliverable)) -} -func (d Disposition) Annotations() Data { - return Data{C.pn_disposition_annotations(d.pn)} -} - -// Wrappers for declarations in condition.h - -type Condition struct{ pn *C.pn_condition_t } - -func (c Condition) IsNil() bool { return c.pn == nil } -func (c Condition) IsSet() bool { - return bool(C.pn_condition_is_set(c.pn)) -} -func (c Condition) Clear() { - C.pn_condition_clear(c.pn) -} -func (c Condition) Name() string { - return C.GoString(C.pn_condition_get_name(c.pn)) -} -func (c Condition) SetName(name string) int { - nameC := C.CString(name) - defer C.free(unsafe.Pointer(nameC)) - - return int(C.pn_condition_set_name(c.pn, nameC)) -} -func (c Condition) Description() string { - return C.GoString(C.pn_condition_get_description(c.pn)) -} -func (c Condition) SetDescription(description string) int { - descriptionC := C.CString(description) - defer C.free(unsafe.Pointer(descriptionC)) - - return int(C.pn_condition_set_description(c.pn, descriptionC)) -} -func (c Condition) Info() Data { - return Data{C.pn_condition_info(c.pn)} -} -func (c Condition) IsRedirect() bool { - return bool(C.pn_condition_is_redirect(c.pn)) -} -func (c Condition) RedirectHost() string { - return C.GoString(C.pn_condition_redirect_host(c.pn)) -} -func (c Condition) RedirectPort() int { - return int(C.pn_condition_redirect_port(c.pn)) -} - -// Wrappers for declarations in terminus.h - -type TerminusType C.pn_terminus_type_t - -const ( - PnUnspecified TerminusType = C.PN_UNSPECIFIED - PnSource TerminusType = C.PN_SOURCE - PnTarget TerminusType = C.PN_TARGET - PnCoordinator TerminusType = C.PN_COORDINATOR -) - -func (e TerminusType) String() string { - switch e { - - case C.PN_UNSPECIFIED: - return "Unspecified" - case C.PN_SOURCE: - return "Source" - case C.PN_TARGET: - return "Target" - case C.PN_COORDINATOR: - return "Coordinator" - } - return "unknown" -} - -type Durability C.pn_durability_t - -const ( - PnNondurable Durability = C.PN_NONDURABLE - PnConfiguration Durability = C.PN_CONFIGURATION - PnDeliveries Durability = C.PN_DELIVERIES -) - -func (e Durability) String() string { - switch e { - - case C.PN_NONDURABLE: - return "Nondurable" - case C.PN_CONFIGURATION: - return "Configuration" - case C.PN_DELIVERIES: - return "Deliveries" - } - return "unknown" -} - -type ExpiryPolicy C.pn_expiry_policy_t - -const ( - PnExpireWithLink ExpiryPolicy = C.PN_EXPIRE_WITH_LINK - PnExpireWithSession ExpiryPolicy = C.PN_EXPIRE_WITH_SESSION - PnExpireWithConnection ExpiryPolicy = C.PN_EXPIRE_WITH_CONNECTION - PnExpireNever ExpiryPolicy = C.PN_EXPIRE_NEVER -) - -func (e ExpiryPolicy) String() string { - switch e { - - case C.PN_EXPIRE_WITH_LINK: - return "ExpireWithLink" - case C.PN_EXPIRE_WITH_SESSION: - return "ExpireWithSession" - case C.PN_EXPIRE_WITH_CONNECTION: - return "ExpireWithConnection" - case C.PN_EXPIRE_NEVER: - return "ExpireNever" - } - return "unknown" -} - -type DistributionMode C.pn_distribution_mode_t - -const ( - PnDistModeUnspecified DistributionMode = C.PN_DIST_MODE_UNSPECIFIED - PnDistModeCopy DistributionMode = C.PN_DIST_MODE_COPY - PnDistModeMove DistributionMode = C.PN_DIST_MODE_MOVE -) - -func (e DistributionMode) String() string { - switch e { - - case C.PN_DIST_MODE_UNSPECIFIED: - return "DistModeUnspecified" - case C.PN_DIST_MODE_COPY: - return "DistModeCopy" - case C.PN_DIST_MODE_MOVE: - return "DistModeMove" - } - return "unknown" -} - -type Terminus struct{ pn *C.pn_terminus_t } - -func (t Terminus) IsNil() bool { return t.pn == nil } -func (t Terminus) Type() TerminusType { - return TerminusType(C.pn_terminus_get_type(t.pn)) -} -func (t Terminus) SetType(type_ TerminusType) int { - return int(C.pn_terminus_set_type(t.pn, C.pn_terminus_type_t(type_))) -} -func (t Terminus) Address() string { - return C.GoString(C.pn_terminus_get_address(t.pn)) -} -func (t Terminus) SetAddress(address string) int { - addressC := C.CString(address) - defer C.free(unsafe.Pointer(addressC)) - - return int(C.pn_terminus_set_address(t.pn, addressC)) -} -func (t Terminus) SetDistributionMode(mode DistributionMode) int { - return int(C.pn_terminus_set_distribution_mode(t.pn, C.pn_distribution_mode_t(mode))) -} -func (t Terminus) Durability() Durability { - return Durability(C.pn_terminus_get_durability(t.pn)) -} -func (t Terminus) SetDurability(durability Durability) int { - return int(C.pn_terminus_set_durability(t.pn, C.pn_durability_t(durability))) -} -func (t Terminus) ExpiryPolicy() ExpiryPolicy { - return ExpiryPolicy(C.pn_terminus_get_expiry_policy(t.pn)) -} -func (t Terminus) SetExpiryPolicy(policy ExpiryPolicy) int { - return int(C.pn_terminus_set_expiry_policy(t.pn, C.pn_expiry_policy_t(policy))) -} -func (t Terminus) Timeout() time.Duration { - return (time.Duration(C.pn_terminus_get_timeout(t.pn)) * time.Second) -} -func (t Terminus) SetTimeout(timeout time.Duration) int { - return int(C.pn_terminus_set_timeout(t.pn, C.pn_seconds_t(timeout))) -} -func (t Terminus) IsDynamic() bool { - return bool(C.pn_terminus_is_dynamic(t.pn)) -} -func (t Terminus) SetDynamic(dynamic bool) int { - return int(C.pn_terminus_set_dynamic(t.pn, C.bool(dynamic))) -} -func (t Terminus) Properties() Data { - return Data{C.pn_terminus_properties(t.pn)} -} -func (t Terminus) Capabilities() Data { - return Data{C.pn_terminus_capabilities(t.pn)} -} -func (t Terminus) Outcomes() Data { - return Data{C.pn_terminus_outcomes(t.pn)} -} -func (t Terminus) Filter() Data { - return Data{C.pn_terminus_filter(t.pn)} -} -func (t Terminus) Copy(src Terminus) int { - return int(C.pn_terminus_copy(t.pn, src.pn)) -} - -// Wrappers for declarations in connection.h - -type Connection struct{ pn *C.pn_connection_t } - -func (c Connection) IsNil() bool { return c.pn == nil } -func (c Connection) Free() { - C.pn_connection_free(c.pn) -} -func (c Connection) Release() { - C.pn_connection_release(c.pn) -} -func (c Connection) Error() error { - return internal.PnError(unsafe.Pointer(C.pn_connection_error(c.pn))) -} -func (c Connection) State() State { - return State(C.pn_connection_state(c.pn)) -} -func (c Connection) Open() { - C.pn_connection_open(c.pn) -} -func (c Connection) Close() { - C.pn_connection_close(c.pn) -} -func (c Connection) Reset() { - C.pn_connection_reset(c.pn) -} -func (c Connection) Condition() Condition { - return Condition{C.pn_connection_condition(c.pn)} -} -func (c Connection) RemoteCondition() Condition { - return Condition{C.pn_connection_remote_condition(c.pn)} -} -func (c Connection) Container() string { - return C.GoString(C.pn_connection_get_container(c.pn)) -} -func (c Connection) SetContainer(container string) { - containerC := C.CString(container) - defer C.free(unsafe.Pointer(containerC)) - - C.pn_connection_set_container(c.pn, containerC) -} -func (c Connection) SetUser(user string) { - userC := C.CString(user) - defer C.free(unsafe.Pointer(userC)) - - C.pn_connection_set_user(c.pn, userC) -} -func (c Connection) SetPassword(password string) { - passwordC := C.CString(password) - defer C.free(unsafe.Pointer(passwordC)) - - C.pn_connection_set_password(c.pn, passwordC) -} -func (c Connection) User() string { - return C.GoString(C.pn_connection_get_user(c.pn)) -} -func (c Connection) Hostname() string { - return C.GoString(C.pn_connection_get_hostname(c.pn)) -} -func (c Connection) SetHostname(hostname string) { - hostnameC := C.CString(hostname) - defer C.free(unsafe.Pointer(hostnameC)) - - C.pn_connection_set_hostname(c.pn, hostnameC) -} -func (c Connection) RemoteContainer() string { - return C.GoString(C.pn_connection_remote_container(c.pn)) -} -func (c Connection) RemoteHostname() string { - return C.GoString(C.pn_connection_remote_hostname(c.pn)) -} -func (c Connection) OfferedCapabilities() Data { - return Data{C.pn_connection_offered_capabilities(c.pn)} -} -func (c Connection) DesiredCapabilities() Data { - return Data{C.pn_connection_desired_capabilities(c.pn)} -} -func (c Connection) Properties() Data { - return Data{C.pn_connection_properties(c.pn)} -} -func (c Connection) RemoteOfferedCapabilities() Data { - return Data{C.pn_connection_remote_offered_capabilities(c.pn)} -} -func (c Connection) RemoteDesiredCapabilities() Data { - return Data{C.pn_connection_remote_desired_capabilities(c.pn)} -} -func (c Connection) RemoteProperties() Data { - return Data{C.pn_connection_remote_properties(c.pn)} -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/132820cb/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go deleted file mode 100644 index 01ba890..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go +++ /dev/null @@ -1,125 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// Internal implementation details - ignore. -package internal - -// #cgo LDFLAGS: -lqpid-proton -// #include <proton/error.h> -// #include <proton/codec.h> -import "C" - -import ( - "fmt" - "runtime" - "sync" - "sync/atomic" - "unsafe" -) - -// Error type for all proton errors. -type Error string - -// Error prefixes error message with proton: -func (e Error) Error() string { - return "proton: " + string(e) -} - -// Errorf creates an Error with a formatted message -func Errorf(format string, a ...interface{}) Error { - return Error(fmt.Sprintf(format, a...)) -} - -type PnErrorCode int - -func (e PnErrorCode) String() string { - switch e { - case C.PN_EOS: - return "end-of-data" - case C.PN_ERR: - return "error" - case C.PN_OVERFLOW: - return "overflow" - case C.PN_UNDERFLOW: - return "underflow" - case C.PN_STATE_ERR: - return "bad-state" - case C.PN_ARG_ERR: - return "invalid-argument" - case C.PN_TIMEOUT: - return "timeout" - case C.PN_INTR: - return "interrupted" - case C.PN_INPROGRESS: - return "in-progress" - default: - return fmt.Sprintf("unknown-error(%d)", e) - } -} - -func PnError(p unsafe.Pointer) error { - e := (*C.pn_error_t)(p) - if e == nil || C.pn_error_code(e) == 0 { - return nil - } - return Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e))) -} - -// DoRecover is called to recover from internal panics -func DoRecover(err *error) { - r := recover() - switch r := r.(type) { - case nil: // We are not recovering - return - case runtime.Error: // Don't catch runtime.Error - panic(r) - case error: - *err = r - default: - panic(r) - } -} - -// panicIf panics if condition is true, the panic value is Errorf(fmt, args...) -func panicIf(condition bool, fmt string, args ...interface{}) { - if condition { - panic(Errorf(fmt, args...)) - } -} - -// FirstError is a goroutine-safe error holder that keeps the first error that is set. -type FirstError struct { - err atomic.Value - once sync.Once -} - -// Set the error if not allread set. -func (e *FirstError) Set(err error) { - e.once.Do(func() { e.err.Store(err) }) -} - -// Get the error. -func (e *FirstError) Get() error { - v := e.err.Load() - if v != nil { - return v.(error) - } else { - return nil - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
