http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go deleted file mode 100644 index bc2c589..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go +++ /dev/null @@ -1,71 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/* -Package electron lets you write concurrent AMQP 1.0 messaging clients and servers. - -Start by creating a Container with NewContainer. An AMQP Container represents a -single AMQP "application" and can contain client and server connections. - -You can enable AMQP over any connection that implements the standard net.Conn -interface. Typically you can connect with net.Dial() or listen for server -connections with net.Listen. Enable AMQP by passing the net.Conn to -Container.Connection(). - -AMQP allows bi-direction peer-to-peer message exchange as well as -client-to-broker. Messages are sent over "links". Each link is one-way and has a -Sender and Receiver end. Connection.Sender() and Connection.Receiver() open -links to Send() and Receive() messages. Connection.Incoming() lets you accept -incoming links opened by the remote peer. You can open and accept multiple links -in both directions on a single Connection. - -Some of the documentation examples show client and server side by side in a -single program, in separate goroutines. This is only for example purposes, real -AMQP applications would run in separate processes on the network. -More realistic examples: https://github.com/apache/qpid-proton/blob/master/examples/go/README.md - -Some of the documentation examples show client and server side by side in a -single program, in separate goroutines. This is only for example purposes, real -AMQP applications would run in separate processes on the network. -More realistic examples: https://github.com/apache/qpid-proton/blob/master/examples/go/README.md - -*/ -package electron - -//#cgo LDFLAGS: -lqpid-proton -import "C" - -// Just for package comment - -/* DEVELOPER NOTES - -There is a single proton.Engine per connection, each driving it's own event-loop goroutine, -and each with a 'handler'. Most state for a connection is maintained on the handler, and -only accessed in the event-loop goroutine, so no locks are required there. - -The handler sets up channels as needed to get or send data from user goroutines -using electron types like Sender or Receiver. - -Engine.Inject injects actions into the event loop from user goroutines. It is -important to check at the start of an injected function that required objects -are still valid, for example a link may be remotely closed between the time a -Sender function calls Inject and the time the injected function is execute by -the handler goroutine. - -*/
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go deleted file mode 100644 index 294e952..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go +++ /dev/null @@ -1,546 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -import ( - "fmt" - "net" - "path" - "qpid.apache.org/amqp" - "reflect" - "runtime" - "testing" - "time" -) - -func fatalIf(t *testing.T, err error) { - if err != nil { - _, file, line, ok := runtime.Caller(1) // annotate with location of caller. - if ok { - _, file = path.Split(file) - } - t.Fatalf("(from %s:%d) %v", file, line, err) - } -} - -func errorIf(t *testing.T, err error) { - if err != nil { - _, file, line, ok := runtime.Caller(1) // annotate with location of caller. - if ok { - _, file = path.Split(file) - } - t.Errorf("(from %s:%d) %v", file, line, err) - } -} - -func checkEqual(want interface{}, got interface{}) error { - if !reflect.DeepEqual(want, got) { - return fmt.Errorf("%#v != %#v", want, got) - } - return nil -} - -// Start a server, return listening addr and channel for incoming Connections. -func newServer(t *testing.T, cont Container, opts ...ConnectionOption) (net.Addr, <-chan Connection) { - listener, err := net.Listen("tcp", "") - fatalIf(t, err) - addr := listener.Addr() - ch := make(chan Connection) - go func() { - conn, err := listener.Accept() - c, err := cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...) - fatalIf(t, err) - ch <- c - }() - return addr, ch -} - -// Open a client connection and session, return the session. -func newClient(t *testing.T, cont Container, addr net.Addr, opts ...ConnectionOption) Session { - conn, err := net.Dial(addr.Network(), addr.String()) - fatalIf(t, err) - c, err := cont.Connection(conn, opts...) - fatalIf(t, err) - sn, err := c.Session() - fatalIf(t, err) - return sn -} - -// Return client and server ends of the same connection. -func newClientServerOpts(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (client Session, server Connection) { - addr, ch := newServer(t, NewContainer("test-server"), sopts...) - client = newClient(t, NewContainer("test-client"), addr, copts...) - return client, <-ch -} - -// Return client and server ends of the same connection. -func newClientServer(t *testing.T) (client Session, server Connection) { - return newClientServerOpts(t, nil, nil) -} - -// Close client and server -func closeClientServer(client Session, server Connection) { - client.Connection().Close(nil) - server.Close(nil) -} - -// Send a message one way with a client sender and server receiver, verify ack. -func TestClientSendServerReceive(t *testing.T) { - nLinks := 3 - nMessages := 3 - - rchan := make(chan Receiver, nLinks) - client, server := newClientServer(t) - go func() { - for in := range server.Incoming() { - switch in := in.(type) { - case *IncomingReceiver: - in.SetCapacity(1) - in.SetPrefetch(false) - rchan <- in.Accept().(Receiver) - default: - in.Accept() - } - } - }() - - defer func() { closeClientServer(client, server) }() - - s := make([]Sender, nLinks) - for i := 0; i < nLinks; i++ { - var err error - s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i))) - if err != nil { - t.Fatal(err) - } - } - r := make([]Receiver, nLinks) - for i := 0; i < nLinks; i++ { - r[i] = <-rchan - } - - for i := 0; i < nLinks; i++ { - for j := 0; j < nMessages; j++ { - // Client send - ack := make(chan Outcome, 1) - sendDone := make(chan struct{}) - go func() { - defer close(sendDone) - m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j)) - var err error - s[i].SendAsync(m, ack, "testing") - if err != nil { - t.Fatal(err) - } - }() - - // Server recieve - rm, err := r[i].Receive() - if err != nil { - t.Fatal(err) - } - if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got { - t.Errorf("%#v != %#v", want, got) - } - - // Should not be acknowledged on client yet - <-sendDone - select { - case <-ack: - t.Errorf("unexpected ack") - default: - } - - // Server send ack - if err := rm.Reject(); err != nil { - t.Error(err) - } - // Client get ack. - if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected { - t.Error("unexpected ack: ", a.Status, a.Error, a.Value) - } - } - } -} - -func TestClientReceiver(t *testing.T) { - nMessages := 3 - client, server := newClientServer(t) - go func() { - for in := range server.Incoming() { - switch in := in.(type) { - case *IncomingSender: - s := in.Accept().(Sender) - go func() { - for i := int32(0); i < int32(nMessages); i++ { - out := s.SendSync(amqp.NewMessageWith(i)) - if out.Error != nil { - t.Error(out.Error) - return - } - } - s.Close(nil) - }() - default: - in.Accept() - } - } - }() - - r, err := client.Receiver(Source("foo")) - if err != nil { - t.Fatal(err) - } - for i := int32(0); i < int32(nMessages); i++ { - rm, err := r.Receive() - if err != nil { - if err != Closed { - t.Error(err) - } - break - } - if err := rm.Accept(); err != nil { - t.Error(err) - } - if b, ok := rm.Message.Body().(int32); !ok || b != i { - t.Errorf("want %v, true got %v, %v", i, b, ok) - } - } - server.Close(nil) - client.Connection().Close(nil) -} - -// Test timeout versions of waiting functions. -func TestTimeouts(t *testing.T) { - var err error - rchan := make(chan Receiver, 1) - client, server := newClientServer(t) - go func() { - for i := range server.Incoming() { - switch i := i.(type) { - case *IncomingReceiver: - i.SetCapacity(1) - i.SetPrefetch(false) - rchan <- i.Accept().(Receiver) // Issue credit only on receive - default: - i.Accept() - } - } - }() - defer func() { closeClientServer(client, server) }() - - // Open client sender - snd, err := client.Sender(Target("test")) - if err != nil { - t.Fatal(err) - } - rcv := <-rchan - - // Test send with timeout - short := time.Millisecond - long := time.Second - m := amqp.NewMessage() - if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout. - t.Error("want Timeout got", err) - } - if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout. - t.Error("want Timeout got", err) - } - // Test receive with timeout - if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout. - t.Error("want Timeout got", err) - } - // Test receive with timeout - if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout. - t.Error("want Timeout got", err) - } - // There is now a credit on the link due to receive - ack := make(chan Outcome) - snd.SendAsyncTimeout(m, ack, nil, short) - // Disposition should timeout - select { - case <-ack: - t.Errorf("want Timeout got %#v", ack) - case <-time.After(short): - } - - // Receive and accept - rm, err := rcv.ReceiveTimeout(long) - if err != nil { - t.Fatal(err) - } - if err := rm.Accept(); err != nil { - t.Fatal(err) - } - // Sender get ack - if a := <-ack; a.Status != Accepted || a.Error != nil { - t.Errorf("want (accepted, nil) got %#v", a) - } -} - -// A server that returns the opposite end of each client link via channels. -type pairs struct { - t *testing.T - client Session - server Connection - rchan chan Receiver - schan chan Sender - capacity int - prefetch bool -} - -func newPairs(t *testing.T, capacity int, prefetch bool) *pairs { - p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)} - p.client, p.server = newClientServer(t) - go func() { - for i := range p.server.Incoming() { - switch i := i.(type) { - case *IncomingReceiver: - i.SetCapacity(capacity) - i.SetPrefetch(prefetch) - p.rchan <- i.Accept().(Receiver) - case *IncomingSender: - p.schan <- i.Accept().(Sender) - default: - i.Accept() - } - } - }() - return p -} - -func (p *pairs) close() { - closeClientServer(p.client, p.server) -} - -// Return a client sender and server receiver -func (p *pairs) senderReceiver() (Sender, Receiver) { - snd, err := p.client.Sender() - fatalIf(p.t, err) - rcv := <-p.rchan - return snd, rcv -} - -// Return a client receiver and server sender -func (p *pairs) receiverSender() (Receiver, Sender) { - rcv, err := p.client.Receiver() - fatalIf(p.t, err) - snd := <-p.schan - return rcv, snd -} - -type result struct { - label string - err error - value interface{} -} - -func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) } - -func doSend(snd Sender, results chan result) { - err := snd.SendSync(amqp.NewMessage()).Error - results <- result{"send", err, nil} -} - -func doReceive(rcv Receiver, results chan result) { - msg, err := rcv.Receive() - results <- result{"receive", err, msg} -} - -func doDisposition(ack <-chan Outcome, results chan result) { - results <- result{"disposition", (<-ack).Error, nil} -} - -// Senders get credit immediately if receivers have prefetch set -func TestSendReceivePrefetch(t *testing.T) { - pairs := newPairs(t, 1, true) - s, r := pairs.senderReceiver() - s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit. - if _, err := r.Receive(); err != nil { - t.Error(err) - } -} - -// Senders do not get credit till Receive() if receivers don't have prefetch -func TestSendReceiveNoPrefetch(t *testing.T) { - pairs := newPairs(t, 1, false) - s, r := pairs.senderReceiver() - done := make(chan struct{}, 1) - go func() { - s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit. - close(done) - }() - select { - case <-done: - t.Errorf("send should be blocked on credit") - default: - if _, err := r.Receive(); err != nil { - t.Error(err) - } else { - <-done - } // Should be unblocked now - } -} - -// Test that closing Links interrupts blocked link functions. -func TestLinkCloseInterrupt(t *testing.T) { - want := amqp.Error{Name: "x", Description: "all bad"} - pairs := newPairs(t, 1, false) - results := make(chan result) // Collect expected errors - - // Note closing the link does not interrupt Send() calls, the AMQP spec says - // that deliveries can be settled after the link is closed. - - // Receiver.Close() interrupts Receive() - snd, rcv := pairs.senderReceiver() - go doReceive(rcv, results) - rcv.Close(want) - if r := <-results; want != r.err { - t.Errorf("want %#v got %#v", want, r) - } - - // Remote Sender.Close() interrupts Receive() - snd, rcv = pairs.senderReceiver() - go doReceive(rcv, results) - snd.Close(want) - if r := <-results; want != r.err { - t.Errorf("want %#v got %#v", want, r) - } -} - -// Test closing the server end of a connection. -func TestConnectionCloseInterrupt1(t *testing.T) { - want := amqp.Error{Name: "x", Description: "bad"} - pairs := newPairs(t, 1, true) - results := make(chan result) // Collect expected errors - - // Connection.Close() interrupts Send, Receive, Disposition. - snd, rcv := pairs.senderReceiver() - go doSend(snd, results) - - if _, err := rcv.Receive(); err != nil { - t.Error("receive", err) - } - rcv, snd = pairs.receiverSender() - go doReceive(rcv, results) - - snd, rcv = pairs.senderReceiver() - ack := snd.SendWaitable(amqp.NewMessage()) - if _, err := rcv.Receive(); err != nil { - t.Error("receive", err) - } - go doDisposition(ack, results) - - pairs.server.Close(want) - for i := 0; i < 3; i++ { - if r := <-results; want != r.err { - t.Errorf("want %v got %v", want, r) - } - } -} - -// Test closing the client end of the connection. -func TestConnectionCloseInterrupt2(t *testing.T) { - want := amqp.Error{Name: "x", Description: "bad"} - pairs := newPairs(t, 1, true) - results := make(chan result) // Collect expected errors - - // Connection.Close() interrupts Send, Receive, Disposition. - snd, rcv := pairs.senderReceiver() - go doSend(snd, results) - if _, err := rcv.Receive(); err != nil { - t.Error("receive", err) - } - - rcv, snd = pairs.receiverSender() - go doReceive(rcv, results) - - snd, rcv = pairs.senderReceiver() - ack := snd.SendWaitable(amqp.NewMessage()) - go doDisposition(ack, results) - - pairs.client.Connection().Close(want) - for i := 0; i < 3; i++ { - if r := <-results; want != r.err { - t.Errorf("want %v got %v", want, r.err) - } - } -} - -func heartbeat(c Connection) time.Duration { - return c.(*connection).engine.Transport().RemoteIdleTimeout() -} - -func TestHeartbeat(t *testing.T) { - client, server := newClientServerOpts(t, - []ConnectionOption{Heartbeat(102 * time.Millisecond)}, - nil) - defer closeClientServer(client, server) - - var serverHeartbeat time.Duration - - go func() { - for in := range server.Incoming() { - switch in := in.(type) { - case *IncomingConnection: - serverHeartbeat = in.Heartbeat() - in.AcceptConnection(Heartbeat(101 * time.Millisecond)) - default: - in.Accept() - } - } - }() - - // Freeze the server to stop it sending heartbeats. - unfreeze := make(chan bool) - defer close(unfreeze) - freeze := func() error { return server.(*connection).engine.Inject(func() { <-unfreeze }) } - - fatalIf(t, client.Sync()) - errorIf(t, checkEqual(101*time.Millisecond, heartbeat(client.Connection()))) - errorIf(t, checkEqual(102*time.Millisecond, serverHeartbeat)) - errorIf(t, client.Connection().Error()) - - // Freeze the server for less than a heartbeat - fatalIf(t, freeze()) - time.Sleep(50 * time.Millisecond) - unfreeze <- true - // Make sure server is still responding. - s, err := client.Sender() - errorIf(t, err) - errorIf(t, s.Sync()) - - // Freeze the server till the client times out the connection - fatalIf(t, freeze()) - select { - case <-client.Done(): - if amqp.ResourceLimitExceeded != client.Error().(amqp.Error).Name { - t.Error("bad timeout error:", client.Error()) - } - case <-time.After(400 * time.Millisecond): - t.Error("connection failed to time out") - } - - unfreeze <- true // Unfreeze the server - <-server.Done() - if amqp.ResourceLimitExceeded != server.Error().(amqp.Error).Name { - t.Error("bad timeout error:", server.Error()) - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go deleted file mode 100644 index ca93e5b..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go +++ /dev/null @@ -1,182 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -import ( - "fmt" - "io" - "qpid.apache.org/proton" -) - -// Closed is an alias for io.EOF. It is returned as an error when an endpoint -// was closed cleanly. -var Closed = io.EOF - -// Endpoint is the local end of a communications channel to the remote peer -// process. The following interface implement Endpoint: Connection, Session, -// Sender and Receiver. -// -// You can create an endpoint with functions on Container, Connection and -// Session. You can accept incoming endpoints from the remote peer using -// Connection.Incoming() -// -type Endpoint interface { - // Close an endpoint and signal an error to the remote end if error != nil. - Close(error) - - // String is a human readable identifier, useful for debugging and logging. - String() string - - // Error returns nil if the endpoint is open, otherwise returns an error. - // Error() == Closed means the endpoint was closed without error. - Error() error - - // Connection is the connection associated with this endpoint. - Connection() Connection - - // Done returns a channel that will close when the endpoint closes. - // After Done() has closed, Error() will return the reason for closing. - Done() <-chan struct{} - - // Sync() waits for the remote peer to confirm the endpoint is active or - // reject it with an error. You can call it immediately on new endpoints - // for more predictable error handling. - // - // AMQP is an asynchronous protocol. It is legal to create an endpoint and - // start using it without waiting for confirmation. This avoids a needless - // delay in the non-error case and throughput by "assuming the best". - // - // However if there *is* an error, these "optimistic" actions will fail. The - // endpoint and its children will be closed with an error. The error will only - // be detected when you try to use one of these endpoints or call Sync() - Sync() error - - // Called in handler goroutine when endpoint is remotely closed. - closed(err error) error - wakeSync() -} - -// Base implementation for Endpoint -type endpoint struct { - err proton.ErrorHolder - str string // String() return value. - done chan struct{} - active chan struct{} -} - -func (e *endpoint) init(s string) { - e.str = s - e.done = make(chan struct{}) - e.active = make(chan struct{}) -} - -// Called in proton goroutine on remote open. -func (e *endpoint) wakeSync() { - select { // Close active channel if not already closed. - case <-e.active: - default: - close(e.active) - } -} - -// Called in proton goroutine (from handler) on a Closed or Disconnected event. -// -// Set err if there is not already an error on the endpoint. -// Return Error() -func (e *endpoint) closed(err error) error { - select { - case <-e.done: - // Already closed - default: - e.err.Set(err) - e.err.Set(Closed) - e.wakeSync() // Make sure we wake up Sync() - close(e.done) - } - return e.Error() -} - -func (e *endpoint) String() string { return e.str } - -func (e *endpoint) Error() error { return e.err.Get() } - -func (e *endpoint) Done() <-chan struct{} { return e.done } - -func (e *endpoint) Sync() error { - <-e.active - return e.Error() -} - -// Call in proton goroutine to initiate closing an endpoint locally -// handler will complete the close when remote end closes. -func localClose(ep proton.Endpoint, err error) { - if ep.State().LocalActive() { - proton.CloseError(ep, err) - } -} - -// Incoming is the interface for incoming endpoints, see Connection.Incoming() -// -// Call Incoming.Accept() to open the endpoint or Incoming.Reject() to close it -// with optional error -// -// Implementing types are *IncomingConnection, *IncomingSession, *IncomingSender -// and *IncomingReceiver. Each type provides methods to examine the incoming -// endpoint request and set configuration options for the local endpoint -// before calling Accept() or Reject() -type Incoming interface { - // Accept and open the endpoint. - Accept() Endpoint - - // Reject the endpoint with an error - Reject(error) - - // wait for and call the accept function, call in proton goroutine. - wait() error - pEndpoint() proton.Endpoint -} - -type incoming struct { - pep proton.Endpoint - acceptCh chan func() error -} - -func makeIncoming(e proton.Endpoint) incoming { - return incoming{pep: e, acceptCh: make(chan func() error)} -} - -func (in *incoming) String() string { return fmt.Sprintf("%s: %s", in.pep.Type(), in.pep) } -func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } } - -// Call in proton goroutine, wait for and call the accept function. -func (in *incoming) wait() error { return (<-in.acceptCh)() } - -func (in *incoming) pEndpoint() proton.Endpoint { return in.pep } - -// Called in app goroutine to send an accept function to proton and return the resulting endpoint. -func (in *incoming) accept(f func() Endpoint) Endpoint { - done := make(chan Endpoint) - in.acceptCh <- func() error { - ep := f() - done <- ep - return nil - } - return <-done -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/error.go b/proton-c/bindings/go/src/qpid.apache.org/electron/error.go deleted file mode 100644 index 4dcfd94..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/error.go +++ /dev/null @@ -1,35 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -import ( - "fmt" -) - -// assert panics if condition is false with optional formatted message -func assert(condition bool, format ...interface{}) { - if !condition { - if len(format) > 0 { - panic(fmt.Errorf(format[0].(string), format[1:]...)) - } else { - panic(fmt.Errorf("assertion failed")) - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go deleted file mode 100644 index 93f275b..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package electron_test - -import ( - "fmt" - "net" - "qpid.apache.org/amqp" - "qpid.apache.org/electron" -) - -// Print errors -func check(msg string, err error) bool { - if err != nil { - fmt.Printf("%s: %s\n", msg, err) - } - return err == nil -} - -func runServer(cont electron.Container, l net.Listener) { - for c, err := cont.Accept(l); check("accept connection", err); c, err = cont.Accept(l) { - go func() { // Process connections concurrently, accepting AMQP endpoints - for in := range c.Incoming() { - ep := in.Accept() // Accept all endpoints - go func() { // Process endpoints concurrently - switch ep := ep.(type) { - case electron.Sender: - m := amqp.NewMessageWith("hello yourself") - fmt.Printf("server %q sending %q\n", ep.Source(), m.Body()) - ep.SendForget(m) // One-way send, client does not need to Accept. - case electron.Receiver: - if rm, err := ep.Receive(); check("server receive", err) { - fmt.Printf("server %q received %q\n", ep.Target(), rm.Message.Body()) - err := rm.Accept() // Client is waiting for Accept. - check("accept message", err) - } - } - }() - } - }() - } -} - -func startServer() (addr net.Addr) { - cont := electron.NewContainer("server") - if l, err := net.Listen("tcp", ""); check("listen", err) { - addr = l.Addr() - go runServer(cont, l) - } - return addr -} - -// Connect to addr and send/receive a message. -func client(addr net.Addr) { - if c, err := electron.Dial(addr.Network(), addr.String()); check("dial", err) { - defer c.Close(nil) - if s, err := c.Sender(electron.Target("target")); check("sender", err) { - fmt.Printf("client sending\n") - s.SendSync(amqp.NewMessageWith("hello")) // Send and wait for server to Accept() - } - if r, err := c.Receiver(electron.Source("source")); check("receiver", err) { - if rm, err := r.Receive(); err == nil { - fmt.Printf("client received %q\n", rm.Message.Body()) - } - } - } -} - -// Example client and server communicating via AMQP over a TCP/IP connection. -// -// Normally client and server would be separate processes. -// For more realistic examples: -// https://github.com/apache/qpid-proton/blob/master/examples/go/README.md -// -func Example_clientServer() { - addr := startServer() - client(addr) - // Output: - // client sending - // server "target" received "hello" - // server "source" sending "hello yourself" - // client received "hello yourself" -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go deleted file mode 100644 index af1efd6..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go +++ /dev/null @@ -1,201 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -import ( - "qpid.apache.org/amqp" - "qpid.apache.org/proton" -) - -// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated. - -type handler struct { - delegator *proton.MessagingAdapter - connection *connection - links map[proton.Link]Endpoint - sentMessages map[proton.Delivery]sentMessage - sessions map[proton.Session]*session -} - -func newHandler(c *connection) *handler { - h := &handler{ - connection: c, - links: make(map[proton.Link]Endpoint), - sentMessages: make(map[proton.Delivery]sentMessage), - sessions: make(map[proton.Session]*session), - } - h.delegator = proton.NewMessagingAdapter(h) - // Disable auto features of MessagingAdapter, we do these ourselves. - h.delegator.Prefetch = 0 - h.delegator.AutoAccept = false - h.delegator.AutoSettle = false - h.delegator.AutoOpen = false - return h -} - -func (h *handler) linkError(l proton.Link, msg string) { - proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l)) -} - -func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) { - switch t { - - case proton.MMessage: - if r, ok := h.links[e.Link()].(*receiver); ok { - r.message(e.Delivery()) - } else { - h.linkError(e.Link(), "no receiver") - } - - case proton.MSettled: - if sm, ok := h.sentMessages[e.Delivery()]; ok { - d := e.Delivery().Remote() - sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value} - delete(h.sentMessages, e.Delivery()) - } - - case proton.MSendable: - if s, ok := h.links[e.Link()].(*sender); ok { - s.sendable() - } else { - h.linkError(e.Link(), "no sender") - } - - case proton.MConnectionOpening: - h.connection.heartbeat = e.Transport().RemoteIdleTimeout() - if e.Connection().State().LocalUninit() { // Remotely opened - h.incoming(newIncomingConnection(h.connection)) - } - h.connection.wakeSync() - - case proton.MSessionOpening: - if e.Session().State().LocalUninit() { // Remotely opened - h.incoming(newIncomingSession(h, e.Session())) - } - h.sessions[e.Session()].wakeSync() - - case proton.MSessionClosed: - h.sessionClosed(e.Session(), proton.EndpointError(e.Session())) - - case proton.MLinkOpening: - l := e.Link() - if ss := h.sessions[l.Session()]; ss != nil { - if l.State().LocalUninit() { // Remotely opened. - if l.IsReceiver() { - h.incoming(newIncomingReceiver(ss, l)) - } else { - h.incoming(newIncomingSender(ss, l)) - } - } - if ep, ok := h.links[l]; ok { - ep.wakeSync() - } else { - h.linkError(l, "no link") - } - } else { - h.linkError(l, "no session") - } - - case proton.MLinkClosing: - e.Link().Close() - - case proton.MLinkClosed: - h.linkClosed(e.Link(), proton.EndpointError(e.Link())) - - case proton.MConnectionClosing: - h.connection.err.Set(e.Connection().RemoteCondition().Error()) - - case proton.MConnectionClosed: - h.shutdown(proton.EndpointError(e.Connection())) - - case proton.MDisconnected: - err := e.Transport().Condition().Error() - if err == nil { - err = amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection) - } - h.shutdown(err) - } -} - -func (h *handler) incoming(in Incoming) { - var err error - if h.connection.incoming != nil { - h.connection.incoming <- in - // Must block until accept/reject, subsequent events may use the incoming endpoint. - err = in.wait() - } else { - err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s", - in.pEndpoint().Type(), in.pEndpoint().String()) - } - if err == nil { - in.pEndpoint().Open() - } else { - proton.CloseError(in.pEndpoint(), err) - } -} - -func (h *handler) addLink(pl proton.Link, el Endpoint) { - h.links[pl] = el -} - -func (h *handler) linkClosed(l proton.Link, err error) { - if link, ok := h.links[l]; ok { - _ = link.closed(err) - delete(h.links, l) - l.Free() - } -} - -func (h *handler) sessionClosed(ps proton.Session, err error) { - if s, ok := h.sessions[ps]; ok { - delete(h.sessions, ps) - err = s.closed(err) - for l, _ := range h.links { - if l.Session() == ps { - h.linkClosed(l, err) - } - } - ps.Free() - } -} - -func (h *handler) shutdown(err error) { - err = h.connection.closed(err) - for _, sm := range h.sentMessages { - // Don't block but ensure outcome is sent eventually. - if sm.ack != nil { - o := Outcome{Unacknowledged, err, sm.value} - select { - case sm.ack <- o: - default: - go func(ack chan<- Outcome) { ack <- o }(sm.ack) // Deliver it eventually - } - } - } - h.sentMessages = nil - for _, l := range h.links { - _ = l.closed(err) - } - h.links = nil - for _, s := range h.sessions { - _ = s.closed(err) - } - h.sessions = nil -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/link.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go deleted file mode 100644 index 1d17894..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go +++ /dev/null @@ -1,221 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -import ( - "fmt" - "qpid.apache.org/proton" -) - -// Settings associated with a link -type LinkSettings interface { - // Source address that messages are coming from. - Source() string - - // Target address that messages are going to. - Target() string - - // Name is a unique name for the link among links between the same - // containers in the same direction. By default generated automatically. - LinkName() string - - // IsSender is true if this is the sending end of the link. - IsSender() bool - - // IsReceiver is true if this is the receiving end of the link. - IsReceiver() bool - - // SndSettle defines when the sending end of the link settles message delivery. - SndSettle() SndSettleMode - - // RcvSettle defines when the sending end of the link settles message delivery. - RcvSettle() RcvSettleMode - - // Session containing the Link - Session() Session -} - -// LinkOption can be passed when creating a sender or receiver link to set optional configuration. -type LinkOption func(*linkSettings) - -// Source returns a LinkOption that sets address that messages are coming from. -func Source(s string) LinkOption { return func(l *linkSettings) { l.source = s } } - -// Target returns a LinkOption that sets address that messages are going to. -func Target(s string) LinkOption { return func(l *linkSettings) { l.target = s } } - -// LinkName returns a LinkOption that sets the link name. -func LinkName(s string) LinkOption { return func(l *linkSettings) { l.target = s } } - -// SndSettle returns a LinkOption that sets the send settle mode -func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sndSettle = m } } - -// RcvSettle returns a LinkOption that sets the send settle mode -func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { l.rcvSettle = m } } - -// SndSettleMode returns a LinkOption that defines when the sending end of the -// link settles message delivery. -type SndSettleMode proton.SndSettleMode - -// Capacity returns a LinkOption that sets the link capacity -func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = n } } - -// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender. -func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch = p } } - -// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages -// are sent but no acknowledgment is received, messages can be lost if there is -// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst -func AtMostOnce() LinkOption { - return func(l *linkSettings) { - SndSettle(SndSettled)(l) - RcvSettle(RcvFirst)(l) - } -} - -// AtLeastOnce returns a LinkOption that requests acknowledgment for every -// message, acknowledgment indicates the message was definitely received. In the -// event of a failure, unacknowledged messages can be re-sent but there is a -// chance that the message will be received twice in this case. Sets -// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst -func AtLeastOnce() LinkOption { - return func(l *linkSettings) { - SndSettle(SndUnsettled)(l) - RcvSettle(RcvFirst)(l) - } -} - -const ( - // Messages are sent unsettled - SndUnsettled = SndSettleMode(proton.SndUnsettled) - // Messages are sent already settled - SndSettled = SndSettleMode(proton.SndSettled) - // Sender can send either unsettled or settled messages. - SendMixed = SndSettleMode(proton.SndMixed) -) - -// RcvSettleMode defines when the receiving end of the link settles message delivery. -type RcvSettleMode proton.RcvSettleMode - -const ( - // Receiver settles first. - RcvFirst = RcvSettleMode(proton.RcvFirst) - // Receiver waits for sender to settle before settling. - RcvSecond = RcvSettleMode(proton.RcvSecond) -) - -type linkSettings struct { - source string - target string - linkName string - isSender bool - sndSettle SndSettleMode - rcvSettle RcvSettleMode - capacity int - prefetch bool - session *session - pLink proton.Link -} - -type link struct { - endpoint - linkSettings -} - -func (l *linkSettings) Source() string { return l.source } -func (l *linkSettings) Target() string { return l.target } -func (l *linkSettings) LinkName() string { return l.linkName } -func (l *linkSettings) IsSender() bool { return l.isSender } -func (l *linkSettings) IsReceiver() bool { return !l.isSender } -func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle } -func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle } - -func (l *link) Session() Session { return l.session } -func (l *link) Connection() Connection { return l.session.Connection() } -func (l *link) engine() *proton.Engine { return l.session.connection.engine } -func (l *link) handler() *handler { return l.session.connection.handler } - -// Open a link and return the linkSettings. -func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSettings, error) { - l := linkSettings{ - isSender: isSender, - capacity: 1, - prefetch: false, - session: sn, - } - for _, set := range setting { - set(&l) - } - if l.linkName == "" { - l.linkName = l.session.connection.container.nextLinkName() - } - if l.IsSender() { - l.pLink = l.session.pSession.Sender(l.linkName) - } else { - l.pLink = l.session.pSession.Receiver(l.linkName) - } - if l.pLink.IsNil() { - return l, fmt.Errorf("cannot create link %s", l.pLink) - } - l.pLink.Source().SetAddress(l.source) - l.pLink.Target().SetAddress(l.target) - l.pLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle)) - l.pLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle)) - l.pLink.Open() - return l, nil -} - -func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings { - return linkSettings{ - isSender: pLink.IsSender(), - source: pLink.RemoteSource().Address(), - target: pLink.RemoteTarget().Address(), - linkName: pLink.Name(), - sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()), - rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()), - capacity: 1, - prefetch: false, - pLink: pLink, - session: sn, - } -} - -// Not part of Link interface but use by Sender and Receiver. -func (l *link) Credit() (credit int, err error) { - err = l.engine().InjectWait(func() error { - if l.Error() != nil { - return l.Error() - } - credit = l.pLink.Credit() - return nil - }) - return -} - -// Not part of Link interface but use by Sender and Receiver. -func (l *link) Capacity() int { return l.capacity } - -func (l *link) Close(err error) { - _ = l.engine().Inject(func() { - if l.Error() == nil { - localClose(l.pLink, err) - } - }) -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go deleted file mode 100644 index 781fd7c..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go +++ /dev/null @@ -1,236 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -import ( - "fmt" - "qpid.apache.org/amqp" - "qpid.apache.org/proton" - "time" -) - -// Receiver is a Link that receives messages. -// -type Receiver interface { - Endpoint - LinkSettings - - // Receive blocks until a message is available or until the Receiver is closed - // and has no more buffered messages. - Receive() (ReceivedMessage, error) - - // ReceiveTimeout is like Receive but gives up after timeout, see Timeout. - // - // Note that that if Prefetch is false, after a Timeout the credit issued by - // Receive remains on the link. It will be used by the next call to Receive. - ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error) - - // Prefetch==true means the Receiver will automatically issue credit to the - // remote sender to keep its buffer as full as possible, i.e. it will - // "pre-fetch" messages independently of the application calling - // Receive(). This gives good throughput for applications that handle a - // continuous stream of messages. Larger capacity may improve throughput, the - // optimal value depends on the characteristics of your application. - // - // Prefetch==false means the Receiver will issue only issue credit when you - // call Receive(), and will only issue enough credit to satisfy the calls - // actually made. This gives lower throughput but will not fetch any messages - // in advance. It is good for synchronous applications that need to evaluate - // each message before deciding whether to receive another. The - // request-response pattern is a typical example. If you make concurrent - // calls to Receive with pre-fetch disabled, you can improve performance by - // setting the capacity close to the expected number of concurrent calls. - // - Prefetch() bool - - // Capacity is the size (number of messages) of the local message buffer - // These are messages received but not yet returned to the application by a call to Receive() - Capacity() int -} - -// Receiver implementation -type receiver struct { - link - buffer chan ReceivedMessage - callers int -} - -func (r *receiver) Capacity() int { return cap(r.buffer) } -func (r *receiver) Prefetch() bool { return r.prefetch } - -// Call in proton goroutine -func newReceiver(ls linkSettings) *receiver { - r := &receiver{link: link{linkSettings: ls}} - r.endpoint.init(r.link.pLink.String()) - if r.capacity < 1 { - r.capacity = 1 - } - r.buffer = make(chan ReceivedMessage, r.capacity) - r.handler().addLink(r.pLink, r) - r.link.pLink.Open() - if r.prefetch { - r.flow(r.maxFlow()) - } - return r -} - -// Call in proton gorotine. Max additional credit we can request. -func (r *receiver) maxFlow() int { return cap(r.buffer) - len(r.buffer) - r.pLink.Credit() } - -func (r *receiver) flow(credit int) { - if credit > 0 { - r.pLink.Flow(credit) - } -} - -// Inject flow check per-caller call when prefetch is off. -// Called with inc=1 at start of call, inc = -1 at end -func (r *receiver) caller(inc int) { - _ = r.engine().Inject(func() { - r.callers += inc - need := r.callers - (len(r.buffer) + r.pLink.Credit()) - max := r.maxFlow() - if need > max { - need = max - } - r.flow(need) - }) -} - -// Inject flow top-up if prefetch is enabled -func (r *receiver) flowTopUp() { - if r.prefetch { - _ = r.engine().Inject(func() { r.flow(r.maxFlow()) }) - } -} - -func (r *receiver) Receive() (rm ReceivedMessage, err error) { - return r.ReceiveTimeout(Forever) -} - -func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) { - assert(r.buffer != nil, "Receiver is not open: %s", r) - if !r.prefetch { // Per-caller flow control - select { // Check for immediate availability, avoid caller() inject - case rm2, ok := <-r.buffer: - if ok { - rm = rm2 - } else { - err = r.Error() - } - return - default: // Not immediately available, inject caller() counts - r.caller(+1) - defer r.caller(-1) - } - } - rmi, err := timedReceive(r.buffer, timeout) - switch err { - case nil: - r.flowTopUp() - rm = rmi.(ReceivedMessage) - case Closed: - err = r.Error() - } - return -} - -// Called in proton goroutine on MMessage event. -func (r *receiver) message(delivery proton.Delivery) { - if r.pLink.State().RemoteClosed() { - localClose(r.pLink, r.pLink.RemoteCondition().Error()) - return - } - if delivery.HasMessage() { - m, err := delivery.Message() - if err != nil { - localClose(r.pLink, err) - return - } - assert(m != nil) - r.pLink.Advance() - if r.pLink.Credit() < 0 { - localClose(r.pLink, fmt.Errorf("received message in excess of credit limit")) - } else { - // We never issue more credit than cap(buffer) so this will not block. - r.buffer <- ReceivedMessage{m, delivery, r} - } - } -} - -func (r *receiver) closed(err error) error { - e := r.link.closed(err) - if r.buffer != nil { - close(r.buffer) - } - return e -} - -// ReceivedMessage contains an amqp.Message and allows the message to be acknowledged. -type ReceivedMessage struct { - // Message is the received message. - Message amqp.Message - - pDelivery proton.Delivery - receiver Receiver -} - -// Acknowledge a ReceivedMessage with the given delivery status. -func (rm *ReceivedMessage) acknowledge(status uint64) error { - return rm.receiver.(*receiver).engine().Inject(func() { - // Deliveries are valid as long as the connection is, unless settled. - rm.pDelivery.SettleAs(uint64(status)) - }) -} - -// Accept tells the sender that we take responsibility for processing the message. -func (rm *ReceivedMessage) Accept() error { return rm.acknowledge(proton.Accepted) } - -// Reject tells the sender we consider the message invalid and unusable. -func (rm *ReceivedMessage) Reject() error { return rm.acknowledge(proton.Rejected) } - -// Release tells the sender we will not process the message but some other -// receiver might. -func (rm *ReceivedMessage) Release() error { return rm.acknowledge(proton.Released) } - -// IncomingReceiver is sent on the Connection.Incoming() channel when there is -// an incoming request to open a receiver link. -type IncomingReceiver struct { - incoming - linkSettings -} - -func newIncomingReceiver(sn *session, pLink proton.Link) *IncomingReceiver { - return &IncomingReceiver{ - incoming: makeIncoming(pLink), - linkSettings: makeIncomingLinkSettings(pLink, sn), - } -} - -// SetCapacity sets the capacity of the incoming receiver, call before Accept() -func (in *IncomingReceiver) SetCapacity(capacity int) { in.capacity = capacity } - -// SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept() -func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = prefetch } - -// Accept accepts an incoming receiver endpoint -func (in *IncomingReceiver) Accept() Endpoint { - return in.accept(func() Endpoint { return newReceiver(in.linkSettings) }) -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go deleted file mode 100644 index f46fdc4..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go +++ /dev/null @@ -1,288 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -// #include <proton/disposition.h> -import "C" - -import ( - "fmt" - "qpid.apache.org/amqp" - "qpid.apache.org/proton" - "time" -) - -// Sender is a Link that sends messages. -// -// The result of sending a message is provided by an Outcome value. -// -// A sender can buffer messages up to the credit limit provided by the remote receiver. -// All the Send* methods will block if the buffer is full until there is space. -// Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error. -// -type Sender interface { - Endpoint - LinkSettings - - // SendSync sends a message and blocks until the message is acknowledged by the remote receiver. - // Returns an Outcome, which may contain an error if the message could not be sent. - SendSync(m amqp.Message) Outcome - - // SendWaitable puts a message in the send buffer and returns a channel that - // you can use to wait for the Outcome of just that message. The channel is - // buffered so you can receive from it whenever you want without blocking. - // - // Note: can block if there is no space to buffer the message. - SendWaitable(m amqp.Message) <-chan Outcome - - // SendForget buffers a message for sending and returns, with no notification of the outcome. - // - // Note: can block if there is no space to buffer the message. - SendForget(m amqp.Message) - - // SendAsync puts a message in the send buffer and returns immediately. An - // Outcome with Value = value will be sent to the ack channel when the remote - // receiver has acknowledged the message or if there is an error. - // - // You can use the same ack channel for many calls to SendAsync(), possibly on - // many Senders. The channel will receive the outcomes in the order they - // become available. The channel should be buffered and/or served by dedicated - // goroutines to avoid blocking the connection. - // - // If ack == nil no Outcome is sent. - // - // Note: can block if there is no space to buffer the message. - SendAsync(m amqp.Message, ack chan<- Outcome, value interface{}) - - SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration) - - SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome - - SendForgetTimeout(m amqp.Message, timeout time.Duration) - - SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome -} - -// Outcome provides information about the outcome of sending a message. -type Outcome struct { - // Status of the message: was it sent, how was it acknowledged. - Status SentStatus - // Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise. - Error error - // Value provided by the application in SendAsync() - Value interface{} -} - -func (o Outcome) send(ack chan<- Outcome) { - if ack != nil { - ack <- o - } -} - -// SentStatus indicates the status of a sent message. -type SentStatus int - -const ( - // Message was never sent - Unsent SentStatus = iota - // Message was sent but never acknowledged. It may or may not have been received. - Unacknowledged - // Message was accepted by the receiver (or was sent pre-settled, accept is assumed) - Accepted - // Message was rejected as invalid by the receiver - Rejected - // Message was not processed by the receiver but may be valid for a different receiver - Released - // Receiver responded with an unrecognized status. - Unknown -) - -// String human readable name for SentStatus. -func (s SentStatus) String() string { - switch s { - case Unsent: - return "unsent" - case Unacknowledged: - return "unacknowledged" - case Accepted: - return "accepted" - case Rejected: - return "rejected" - case Released: - return "released" - case Unknown: - return "unknown" - default: - return fmt.Sprintf("invalid(%d)", s) - } -} - -// Convert proton delivery state code to SentStatus value -func sentStatus(d uint64) SentStatus { - switch d { - case proton.Accepted: - return Accepted - case proton.Rejected: - return Rejected - case proton.Released, proton.Modified: - return Released - default: - return Unknown - } -} - -// Sender implementation, held by handler. -type sender struct { - link - credit chan struct{} // Signal available credit. -} - -func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) { - // wait for credit - if _, err := timedReceive(s.credit, t); err != nil { - if err == Closed && s.Error() != nil { - err = s.Error() - } - Outcome{Unsent, err, v}.send(ack) - return - } - // Send a message in handler goroutine - err := s.engine().Inject(func() { - if s.Error() != nil { - Outcome{Unsent, s.Error(), v}.send(ack) - return - } - - delivery, err2 := s.pLink.Send(m) - switch { - case err2 != nil: - Outcome{Unsent, err2, v}.send(ack) - case ack == nil || s.SndSettle() == SndSettled: // Pre-settled - if s.SndSettle() != SndUnsettled { // Not forced to send unsettled by link policy - delivery.Settle() - } - Outcome{Accepted, nil, v}.send(ack) // Assume accepted - default: - s.handler().sentMessages[delivery] = sentMessage{ack, v} // Register with handler - } - if s.pLink.Credit() > 0 { // Signal there is still credit - s.sendable() - } - }) - if err != nil { - Outcome{Unsent, err, v}.send(ack) - } -} - -// Set credit flag if not already set. Non-blocking, any goroutine -func (s *sender) sendable() { - select { // Non-blocking - case s.credit <- struct{}{}: - default: - } -} - -func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan Outcome { - out := make(chan Outcome, 1) - s.SendAsyncTimeout(m, out, nil, t) - return out -} - -func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) { - s.SendAsyncTimeout(m, nil, nil, t) -} - -func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome { - deadline := time.Now().Add(t) - ack := s.SendWaitableTimeout(m, t) - t = deadline.Sub(time.Now()) // Adjust for time already spent. - if t < 0 { - t = 0 - } - if out, err := timedReceive(ack, t); err == nil { - return out.(Outcome) - } else { - if err == Closed && s.Error() != nil { - err = s.Error() - } - return Outcome{Unacknowledged, err, nil} - } -} - -func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) { - s.SendAsyncTimeout(m, ack, v, Forever) -} - -func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome { - return s.SendWaitableTimeout(m, Forever) -} - -func (s *sender) SendForget(m amqp.Message) { - s.SendForgetTimeout(m, Forever) -} - -func (s *sender) SendSync(m amqp.Message) Outcome { - return <-s.SendWaitable(m) -} - -// handler goroutine -func (s *sender) closed(err error) error { - close(s.credit) - return s.link.closed(err) -} - -func newSender(ls linkSettings) *sender { - s := &sender{link: link{linkSettings: ls}, credit: make(chan struct{}, 1)} - s.endpoint.init(s.link.pLink.String()) - s.handler().addLink(s.pLink, s) - s.link.pLink.Open() - return s -} - -// sentMessage records a sent message on the handler. -type sentMessage struct { - ack chan<- Outcome - value interface{} -} - -// IncomingSender is sent on the Connection.Incoming() channel when there is -// an incoming request to open a sender link. -type IncomingSender struct { - incoming - linkSettings -} - -func newIncomingSender(sn *session, pLink proton.Link) *IncomingSender { - return &IncomingSender{ - incoming: makeIncoming(pLink), - linkSettings: makeIncomingLinkSettings(pLink, sn), - } -} - -// Accept accepts an incoming sender endpoint -func (in *IncomingSender) Accept() Endpoint { - return in.accept(func() Endpoint { return newSender(in.linkSettings) }) -} - -// Call in injected functions to check if the sender is valid. -func (s *sender) valid() bool { - s2, ok := s.handler().links[s.pLink].(*sender) - return ok && s2 == s -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/session.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go deleted file mode 100644 index 6dae354..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go +++ /dev/null @@ -1,139 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -import ( - "qpid.apache.org/proton" -) - -// Session is an AMQP session, it contains Senders and Receivers. -type Session interface { - Endpoint - - // Sender opens a new sender. - Sender(...LinkOption) (Sender, error) - - // Receiver opens a new Receiver. - Receiver(...LinkOption) (Receiver, error) -} - -type session struct { - endpoint - pSession proton.Session - connection *connection - incomingCapacity, outgoingWindow uint -} - -// SessionOption can be passed when creating a Session -type SessionOption func(*session) - -// IncomingCapacity returns a Session Option that sets the size (in bytes) of -// the session's incoming data buffer. -func IncomingCapacity(bytes uint) SessionOption { - return func(s *session) { s.incomingCapacity = bytes } -} - -// OutgoingWindow returns a Session Option that sets the outgoing window size (in frames). -func OutgoingWindow(frames uint) SessionOption { - return func(s *session) { s.outgoingWindow = frames } -} - -// in proton goroutine -func newSession(c *connection, es proton.Session, setting ...SessionOption) *session { - s := &session{ - connection: c, - pSession: es, - } - s.endpoint.init(es.String()) - for _, set := range setting { - set(s) - } - c.handler.sessions[s.pSession] = s - s.pSession.SetIncomingCapacity(s.incomingCapacity) - s.pSession.SetOutgoingWindow(s.outgoingWindow) - s.pSession.Open() - return s -} - -func (s *session) Connection() Connection { return s.connection } -func (s *session) pEndpoint() proton.Endpoint { return s.pSession } -func (s *session) engine() *proton.Engine { return s.connection.engine } - -func (s *session) Close(err error) { - _ = s.engine().Inject(func() { - if s.Error() == nil { - localClose(s.pSession, err) - } - }) -} - -func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) { - err = s.engine().InjectWait(func() error { - if s.Error() != nil { - return s.Error() - } - l, err := makeLocalLink(s, true, setting...) - if err == nil { - snd = newSender(l) - } - return err - }) - return -} - -func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) { - err = s.engine().InjectWait(func() error { - if s.Error() != nil { - return s.Error() - } - l, err := makeLocalLink(s, false, setting...) - if err == nil { - rcv = newReceiver(l) - } - return err - }) - return -} - -// IncomingSender is sent on the Connection.Incoming() channel when there is an -// incoming request to open a session. -type IncomingSession struct { - incoming - h *handler - pSession proton.Session - incomingCapacity, outgoingWindow uint -} - -func newIncomingSession(h *handler, ps proton.Session) *IncomingSession { - return &IncomingSession{incoming: makeIncoming(ps), h: h, pSession: ps} -} - -// SetIncomingCapacity sets the session buffer capacity of an incoming session in bytes. -func (in *IncomingSession) SetIncomingCapacity(bytes uint) { in.incomingCapacity = bytes } - -// SetOutgoingWindow sets the session outgoing window of an incoming session in frames. -func (in *IncomingSession) SetOutgoingWindow(frames uint) { in.outgoingWindow = frames } - -// Accept an incoming session endpoint. -func (in *IncomingSession) Accept() Endpoint { - return in.accept(func() Endpoint { - return newSession(in.h.connection, in.pSession, IncomingCapacity(in.incomingCapacity), OutgoingWindow(in.outgoingWindow)) - }) -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/time.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go deleted file mode 100644 index 51bfbc5..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go +++ /dev/null @@ -1,83 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package electron - -import ( - "fmt" - "math" - "reflect" - "time" -) - -// Timeout is the error returned if an operation does not complete on time. -// -// Methods named *Timeout in this package take time.Duration timeout parameter. -// -// If timeout > 0 and there is no result available before the timeout, they -// return a zero or nil value and Timeout as an error. -// -// If timeout == 0 they will return a result if one is immediatley available or -// nil/zero and Timeout as an error if not. -// -// If timeout == Forever the function will return only when there is a result or -// some non-timeout error occurs. -// -var Timeout = fmt.Errorf("timeout") - -// Forever can be used as a timeout parameter to indicate wait forever. -const Forever time.Duration = math.MaxInt64 - -// timedReceive receives on channel (which can be a chan of any type), waiting -// up to timeout. -// -// timeout==0 means do a non-blocking receive attempt. timeout < 0 means block -// forever. Other values mean block up to the timeout. -// -// Returns error Timeout on timeout, Closed on channel close. -func timedReceive(channel interface{}, timeout time.Duration) (interface{}, error) { - cases := []reflect.SelectCase{ - reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)}, - } - if timeout == 0 { // Non-blocking - cases = append(cases, reflect.SelectCase{Dir: reflect.SelectDefault}) - } else { // Block up to timeout - cases = append(cases, - reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))}) - } - chosen, value, ok := reflect.Select(cases) - switch { - case chosen == 0 && ok: - return value.Interface(), nil - case chosen == 0 && !ok: - return nil, Closed - default: - return nil, Timeout - } -} - -// After is like time.After but returns a nil channel if timeout == Forever -// since selecting on a nil channel will never return. -func After(timeout time.Duration) <-chan time.Time { - if timeout == Forever { - return nil - } else { - return time.After(timeout) - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go deleted file mode 100644 index 083f701..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/* -Package proton wraps Proton-C, an event-driven, concurrent-unsafe AMQP 1.0 -C library (package 'electron' is more "Go-like" and concurrent-safe) - -Consult the C API documentation at http://qpid.apache.org/proton for more -information about the types here. There is a 1-1 correspondence between C type -pn_foo_t and Go type proton.Foo, and between C function - - pn_foo_do_something(pn_foo_t*, ...) - -and Go method - - func (proton.Foo) DoSomething(...) - -The proton.Engine type pumps data between a Go net.Conn and a proton event loop -goroutine that feeds events to a proton.MessagingHandler, which you must implement. -See the Engine documentation for more. - -MessagingHandler defines an event handling interface that you can implement to -react to AMQP protocol events. There is also a lower-level EventHandler, but -MessagingHandler provides a simpler set of events and automates common tasks for you, -for most applications it will be more convenient. - -NOTE: Methods on most types defined in this package (Sessions, Links etc.) can -*only* be called in the event handler goroutine of the relevant -Connection/Engine, either by the HandleEvent method of a handler type or in a -function injected into the goroutine via Inject() or InjectWait() Handlers and -injected functions can set up channels to communicate with other goroutines. -Note the Injecter associated with a handler available as part of the Event value -passed to HandleEvent. - -Separate Engine instances are independent, and can run concurrently. - -The 'electron' package is built on the proton package but instead offers a -concurrent-safe API that can use simple procedural loops rather than event -handlers to express application logic. It is easier to use for most -applications. - -*/ -package proton - -// #cgo LDFLAGS: -lqpid-proton -import "C" - -// This file is just for the package comment. http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go deleted file mode 100644 index c0f0093..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go +++ /dev/null @@ -1,422 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package proton - -import ( - "fmt" - "net" - "os" - "strings" - "sync" - "time" - "unsafe" -) - -/* -#include <proton/connection.h> -#include <proton/event.h> -#include <proton/error.h> -#include <proton/handlers.h> -#include <proton/session.h> -#include <proton/transport.h> -#include <memory.h> -#include <stdlib.h> -*/ -import "C" - -// Injecter allows functions to be "injected" into the event-processing loop, to -// be called in the same goroutine as event handlers. -type Injecter interface { - // Inject a function into the engine goroutine. - // - // f() will be called in the same goroutine as event handlers, so it can safely - // use values belonging to event handlers without synchronization. f() should - // not block, no further events or injected functions can be processed until - // f() returns. - // - // Returns a non-nil error if the function could not be injected and will - // never be called. Otherwise the function will eventually be called. - // - // Note that proton values (Link, Session, Connection etc.) that existed when - // Inject(f) was called may have become invalid by the time f() is executed. - // Handlers should handle keep track of Closed events to ensure proton values - // are not used after they become invalid. One technique is to have map from - // proton values to application values. Check that the map has the correct - // proton/application value pair at the start of the injected function and - // delete the value from the map when handling a Closed event. - Inject(f func()) error - - // InjectWait is like Inject but does not return till f() has completed. - // If f() cannot be injected it returns the error from Inject(), otherwise - // it returns the error from f() - InjectWait(f func() error) error -} - -// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate -// Handler functions sequentially in a single goroutine. Actions taken by -// Handler functions (such as sending messages) are encoded and written to the -// net.Conn. You can create multiple Engines to handle multiple connections -// concurrently. -// -// You implement the EventHandler and/or MessagingHandler interfaces and provide -// those values to NewEngine(). Their HandleEvent method will be called in the -// event-handling goroutine. -// -// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to -// other goroutines, store them, or use them as map indexes. Effectively they are -// just pointers. Other goroutines cannot call their methods directly but they can -// can create a function closure to call such methods and pass it to Engine.Inject() -// to have it evaluated in the engine goroutine. -// -// You are responsible for ensuring you don't use an event value after it is -// invalid. The handler methods will tell you when a value is no longer valid. For -// example after a LinkClosed event, that link is no longer valid. If you do -// Link.Close() yourself (in a handler or injected function) the link remains valid -// until the corresponing LinkClosed event is received by the handler. -// -// Engine.Close() will take care of cleaning up any remaining values when you are -// done with the Engine. All values associated with a engine become invalid when you -// call Engine.Close() -// -// The qpid.apache.org/proton/concurrent package will do all this for you, so it -// may be a better choice for some applications. -// -type Engine struct { - // Error is set on exit from Run() if there was an error. - err ErrorHolder - inject chan func() - - conn net.Conn - connection Connection - transport Transport - collector *C.pn_collector_t - handlers []EventHandler // Handlers for proton events. - running chan struct{} // This channel will be closed when the goroutines are done. - closeOnce sync.Once - timer *time.Timer - traceEvent bool -} - -const bufferSize = 4096 - -func envBool(name string) bool { - v := strings.ToLower(os.Getenv(name)) - return v == "true" || v == "1" || v == "yes" || v == "on" -} - -// Create a new Engine and call Initialize() with conn and handlers -func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) { - eng := &Engine{} - return eng, eng.Initialize(conn, handlers...) -} - -// Initialize an Engine with a connection and handlers. Start it with Run() -func (eng *Engine) Initialize(conn net.Conn, handlers ...EventHandler) error { - eng.inject = make(chan func()) - eng.conn = conn - eng.connection = Connection{C.pn_connection()} - eng.transport = Transport{C.pn_transport()} - eng.collector = C.pn_collector() - eng.handlers = handlers - eng.running = make(chan struct{}) - eng.timer = time.NewTimer(0) - eng.traceEvent = envBool("PN_TRACE_EVT") - if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil { - eng.free() - return fmt.Errorf("proton.NewEngine cannot allocate") - } - C.pn_connection_collect(eng.connection.pn, eng.collector) - return nil -} - -// Create a byte slice backed by C memory. -// Empty or error (size <= 0) returns a nil byte slice. -func cByteSlice(start unsafe.Pointer, size int) []byte { - if start == nil || size <= 0 { - return nil - } else { - // Slice from very large imaginary array in C memory - return (*[1 << 30]byte)(start)[:size:size] - } -} - -func (eng *Engine) Connection() Connection { - return eng.connection -} - -func (eng *Engine) Transport() Transport { - return eng.transport -} - -func (eng *Engine) String() string { - return fmt.Sprintf("[%s]%s-%s", eng.Id(), eng.conn.LocalAddr(), eng.conn.RemoteAddr()) -} - -func (eng *Engine) Id() string { - // Use transport address to match default PN_TRACE_FRM=1 output. - return fmt.Sprintf("%p", eng.Transport().CPtr()) -} - -func (eng *Engine) Error() error { - return eng.err.Get() -} - -// Inject a function into the Engine's event loop. -// -// f() will be called in the same event-processing goroutine that calls Handler -// methods. f() can safely call methods on values that belong to this engine -// (Sessions, Links etc) -// -// The injected function has no parameters or return values. It is normally a -// closure and can use channels to communicate with the injecting goroutine if -// necessary. -// -// Returns a non-nil error if the engine is closed before the function could be -// injected. -func (eng *Engine) Inject(f func()) error { - select { - case eng.inject <- f: - return nil - case <-eng.running: - return eng.Error() - } -} - -// InjectWait is like Inject but does not return till f() has completed or the -// engine is closed, and returns an error value from f() -func (eng *Engine) InjectWait(f func() error) error { - done := make(chan error) - defer close(done) - err := eng.Inject(func() { done <- f() }) - if err != nil { - return err - } - select { - case <-eng.running: - return eng.Error() - case err := <-done: - return err - } -} - -// Server puts the Engine in server mode, meaning it will auto-detect security settings on -// the incoming connnection such as use of SASL and SSL. -// Must be called before Run() -// -func (eng *Engine) Server() { eng.Transport().SetServer() } - -func (eng *Engine) disconnect(err error) { - cond := eng.Transport().Condition() - cond.SetError(err) // Set the provided error. - cond.SetError(eng.conn.Close()) // Use connection error if cond is not already set. - eng.transport.CloseTail() - eng.transport.CloseHead() -} - -// Close the engine's connection. -// If err != nil pass it to the remote end as the close condition. -// Returns when the remote end closes or disconnects. -func (eng *Engine) Close(err error) { - _ = eng.Inject(func() { CloseError(eng.Connection(), err) }) - <-eng.running -} - -// CloseTimeout like Close but disconnect if the remote end doesn't close within timeout. -func (eng *Engine) CloseTimeout(err error, timeout time.Duration) { - _ = eng.Inject(func() { CloseError(eng.Connection(), err) }) - select { - case <-eng.running: - case <-time.After(timeout): - eng.Disconnect(err) - } -} - -// Disconnect the engine's connection immediately without an AMQP close. -// Process any termination events before returning. -func (eng *Engine) Disconnect(err error) { - _ = eng.Inject(func() { eng.disconnect(err) }) - <-eng.running -} - -// Let proton run timed activity and set up the next tick -func (eng *Engine) tick() { - now := time.Now() - next := eng.Transport().Tick(now) - if !next.IsZero() { - eng.timer.Reset(next.Sub(now)) - } -} - -func (eng *Engine) dispatch() bool { - for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) { - e := makeEvent(ce, eng) - if eng.traceEvent { - eng.transport.Log(e.String()) - } - for _, h := range eng.handlers { - h.HandleEvent(e) - } - if e.Type() == EConnectionRemoteOpen { - eng.tick() // Update the tick if changed by remote. - } - C.pn_collector_pop(eng.collector) - } - return !eng.transport.Closed() || C.pn_collector_peek(eng.collector) != nil -} - -func (eng *Engine) writeBuffer() []byte { - size := eng.Transport().Pending() // Evaluate before Head(), may change buffer. - start := eng.Transport().Head() - return cByteSlice(start, size) -} - -func (eng *Engine) readBuffer() []byte { - size := eng.Transport().Capacity() - start := eng.Transport().Tail() - return cByteSlice(start, size) -} - -func (eng *Engine) free() { - if !eng.transport.IsNil() { - eng.transport.Unbind() - eng.transport.Free() - eng.transport = Transport{} - } - if !eng.connection.IsNil() { - eng.connection.Free() - eng.connection = Connection{} - } - if eng.collector != nil { - C.pn_collector_release(eng.collector) - C.pn_collector_free(eng.collector) - eng.collector = nil - } -} - -// Run the engine. Engine.Run() will exit when the engine is closed or -// disconnected. You can check for errors after exit with Engine.Error(). -// -func (eng *Engine) Run() error { - defer eng.free() - eng.transport.Bind(eng.connection) - eng.tick() // Start ticking if needed - - // Channels for read and write buffers going in and out of the read/write goroutines. - // The channels are unbuffered: we want to exchange buffers in seuquence. - readsIn, writesIn := make(chan []byte), make(chan []byte) - readsOut, writesOut := make(chan []byte), make(chan []byte) - - wait := sync.WaitGroup{} - wait.Add(2) // Read and write goroutines - - go func() { // Read goroutine - defer wait.Done() - for { - rbuf, ok := <-readsIn - if !ok { - return - } - n, err := eng.conn.Read(rbuf) - if n > 0 { - readsOut <- rbuf[:n] - } else if err != nil { - _ = eng.Inject(func() { - eng.Transport().Condition().SetError(err) - eng.Transport().CloseTail() - }) - return - } - } - }() - - go func() { // Write goroutine - defer wait.Done() - for { - wbuf, ok := <-writesIn - if !ok { - return - } - n, err := eng.conn.Write(wbuf) - if n > 0 { - writesOut <- wbuf[:n] - } else if err != nil { - _ = eng.Inject(func() { - eng.Transport().Condition().SetError(err) - eng.Transport().CloseHead() - }) - return - } - } - }() - - for eng.dispatch() { - readBuf := eng.readBuffer() - writeBuf := eng.writeBuffer() - // Note that getting the buffers can generate events (eg. SASL events) that - // might close the transport. Check if we are already finished before - // blocking for IO. - if !eng.dispatch() { - break - } - - // sendReads/sendWrites are nil (not sendable in select) unless we have a - // buffer to read/write - var sendReads, sendWrites chan []byte - if readBuf != nil { - sendReads = readsIn - } - if writeBuf != nil { - sendWrites = writesIn - } - - // Send buffers to the read/write goroutines if we have them. - // Get buffers from the read/write goroutines and process them - // Check for injected functions - select { - - case sendReads <- readBuf: - - case sendWrites <- writeBuf: - - case buf := <-readsOut: - eng.transport.Process(uint(len(buf))) - - case buf := <-writesOut: - eng.transport.Pop(uint(len(buf))) - - case f, ok := <-eng.inject: // Function injected from another goroutine - if ok { - f() - } - - case <-eng.timer.C: - eng.tick() - } - } - - eng.err.Set(EndpointError(eng.Connection())) - eng.err.Set(eng.Transport().Condition().Error()) - close(readsIn) - close(writesIn) - close(eng.running) // Signal goroutines have exited and Error is set, disable Inject() - _ = eng.conn.Close() // Close conn, force read/write goroutines to exit (they will Inject) - wait.Wait() // Wait for goroutines - return eng.err.Get() -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/proton/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/error.go deleted file mode 100644 index 80d9680..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/error.go +++ /dev/null @@ -1,96 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// Internal implementation details - ignore. -package proton - -// #cgo LDFLAGS: -lqpid-proton -// #include <proton/error.h> -// #include <proton/codec.h> -import "C" - -import ( - "fmt" - "sync" - "sync/atomic" -) - -type PnErrorCode int - -func (e PnErrorCode) String() string { - switch e { - case C.PN_EOS: - return "end-of-data" - case C.PN_ERR: - return "error" - case C.PN_OVERFLOW: - return "overflow" - case C.PN_UNDERFLOW: - return "underflow" - case C.PN_STATE_ERR: - return "bad-state" - case C.PN_ARG_ERR: - return "invalid-argument" - case C.PN_TIMEOUT: - return "timeout" - case C.PN_INTR: - return "interrupted" - case C.PN_INPROGRESS: - return "in-progress" - default: - return fmt.Sprintf("unknown-error(%d)", e) - } -} - -func PnError(e *C.pn_error_t) error { - if e == nil || C.pn_error_code(e) == 0 { - return nil - } - return fmt.Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e))) -} - -// ErrorHolder is a goroutine-safe error holder that keeps the first error that is set. -type ErrorHolder struct { - once sync.Once - value atomic.Value -} - -// Set the error if not already set, return the error in the Holder. -func (e *ErrorHolder) Set(err error) { - if err != nil { - e.once.Do(func() { e.value.Store(err) }) - } -} - -// Get the error. -func (e *ErrorHolder) Get() (err error) { - err, _ = e.value.Load().(error) - return -} - -// assert panics if condition is false with optional formatted message -func assert(condition bool, format ...interface{}) { - if !condition { - if len(format) > 0 { - panic(fmt.Errorf(format[0].(string), format[1:]...)) - } else { - panic(fmt.Errorf("assertion failed")) - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
