http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/wrappers_gen.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/wrappers_gen.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/wrappers_gen.go deleted file mode 100644 index 73f0d3b..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/wrappers_gen.go +++ /dev/null @@ -1,732 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// -// NOTE: This file was generated by genwrap.go, do not edit it by hand. -// - -package event - -import ( - "qpid.apache.org/proton/go/internal" - "time" - "unsafe" -) - -// #include <proton/types.h> -// #include <proton/event.h> -// #include <stdlib.h> -// #include <proton/session.h> -// #include <proton/link.h> -// #include <proton/delivery.h> -// #include <proton/disposition.h> -// #include <proton/condition.h> -// #include <proton/terminus.h> -// #include <proton/connection.h> -import "C" - -type EventType int - -const ( - EConnectionInit EventType = C.PN_CONNECTION_INIT - EConnectionBound EventType = C.PN_CONNECTION_BOUND - EConnectionUnbound EventType = C.PN_CONNECTION_UNBOUND - EConnectionLocalOpen EventType = C.PN_CONNECTION_LOCAL_OPEN - EConnectionRemoteOpen EventType = C.PN_CONNECTION_REMOTE_OPEN - EConnectionLocalClose EventType = C.PN_CONNECTION_LOCAL_CLOSE - EConnectionRemoteClose EventType = C.PN_CONNECTION_REMOTE_CLOSE - EConnectionFinal EventType = C.PN_CONNECTION_FINAL - ESessionInit EventType = C.PN_SESSION_INIT - ESessionLocalOpen EventType = C.PN_SESSION_LOCAL_OPEN - ESessionRemoteOpen EventType = C.PN_SESSION_REMOTE_OPEN - ESessionLocalClose EventType = C.PN_SESSION_LOCAL_CLOSE - ESessionRemoteClose EventType = C.PN_SESSION_REMOTE_CLOSE - ESessionFinal EventType = C.PN_SESSION_FINAL - ELinkInit EventType = C.PN_LINK_INIT - ELinkLocalOpen EventType = C.PN_LINK_LOCAL_OPEN - ELinkRemoteOpen EventType = C.PN_LINK_REMOTE_OPEN - ELinkLocalClose EventType = C.PN_LINK_LOCAL_CLOSE - ELinkRemoteClose EventType = C.PN_LINK_REMOTE_CLOSE - ELinkLocalDetach EventType = C.PN_LINK_LOCAL_DETACH - ELinkRemoteDetach EventType = C.PN_LINK_REMOTE_DETACH - ELinkFlow EventType = C.PN_LINK_FLOW - ELinkFinal EventType = C.PN_LINK_FINAL - EDelivery EventType = C.PN_DELIVERY - ETransport EventType = C.PN_TRANSPORT - ETransportAuthenticated EventType = C.PN_TRANSPORT_AUTHENTICATED - ETransportError EventType = C.PN_TRANSPORT_ERROR - ETransportHeadClosed EventType = C.PN_TRANSPORT_HEAD_CLOSED - ETransportTailClosed EventType = C.PN_TRANSPORT_TAIL_CLOSED - ETransportClosed EventType = C.PN_TRANSPORT_CLOSED -) - -func (e EventType) String() string { - switch e { - - case C.PN_CONNECTION_INIT: - return "ConnectionInit" - case C.PN_CONNECTION_BOUND: - return "ConnectionBound" - case C.PN_CONNECTION_UNBOUND: - return "ConnectionUnbound" - case C.PN_CONNECTION_LOCAL_OPEN: - return "ConnectionLocalOpen" - case C.PN_CONNECTION_REMOTE_OPEN: - return "ConnectionRemoteOpen" - case C.PN_CONNECTION_LOCAL_CLOSE: - return "ConnectionLocalClose" - case C.PN_CONNECTION_REMOTE_CLOSE: - return "ConnectionRemoteClose" - case C.PN_CONNECTION_FINAL: - return "ConnectionFinal" - case C.PN_SESSION_INIT: - return "SessionInit" - case C.PN_SESSION_LOCAL_OPEN: - return "SessionLocalOpen" - case C.PN_SESSION_REMOTE_OPEN: - return "SessionRemoteOpen" - case C.PN_SESSION_LOCAL_CLOSE: - return "SessionLocalClose" - case C.PN_SESSION_REMOTE_CLOSE: - return "SessionRemoteClose" - case C.PN_SESSION_FINAL: - return "SessionFinal" - case C.PN_LINK_INIT: - return "LinkInit" - case C.PN_LINK_LOCAL_OPEN: - return "LinkLocalOpen" - case C.PN_LINK_REMOTE_OPEN: - return "LinkRemoteOpen" - case C.PN_LINK_LOCAL_CLOSE: - return "LinkLocalClose" - case C.PN_LINK_REMOTE_CLOSE: - return "LinkRemoteClose" - case C.PN_LINK_LOCAL_DETACH: - return "LinkLocalDetach" - case C.PN_LINK_REMOTE_DETACH: - return "LinkRemoteDetach" - case C.PN_LINK_FLOW: - return "LinkFlow" - case C.PN_LINK_FINAL: - return "LinkFinal" - case C.PN_DELIVERY: - return "Delivery" - case C.PN_TRANSPORT: - return "Transport" - case C.PN_TRANSPORT_AUTHENTICATED: - return "TransportAuthenticated" - case C.PN_TRANSPORT_ERROR: - return "TransportError" - case C.PN_TRANSPORT_HEAD_CLOSED: - return "TransportHeadClosed" - case C.PN_TRANSPORT_TAIL_CLOSED: - return "TransportTailClosed" - case C.PN_TRANSPORT_CLOSED: - return "TransportClosed" - } - return "Unknown" -} - -// Wrappers for declarations in session.h - -type Session struct{ pn *C.pn_session_t } - -func (s Session) IsNil() bool { return s.pn == nil } -func (s Session) Free() { - C.pn_session_free(s.pn) -} -func (s Session) State() State { - return State(C.pn_session_state(s.pn)) -} -func (s Session) Error() error { - return internal.PnError(unsafe.Pointer(C.pn_session_error(s.pn))) -} -func (s Session) Condition() Condition { - return Condition{C.pn_session_condition(s.pn)} -} -func (s Session) RemoteCondition() Condition { - return Condition{C.pn_session_remote_condition(s.pn)} -} -func (s Session) Connection() Connection { - return Connection{C.pn_session_connection(s.pn)} -} -func (s Session) Open() { - C.pn_session_open(s.pn) -} -func (s Session) Close() { - C.pn_session_close(s.pn) -} -func (s Session) IncomingCapacity() uint { - return uint(C.pn_session_get_incoming_capacity(s.pn)) -} -func (s Session) SetIncomingCapacity(capacity uint) { - C.pn_session_set_incoming_capacity(s.pn, C.size_t(capacity)) -} -func (s Session) OutgoingBytes() uint { - return uint(C.pn_session_outgoing_bytes(s.pn)) -} -func (s Session) IncomingBytes() uint { - return uint(C.pn_session_incoming_bytes(s.pn)) -} -func (s Session) Next(state State) Session { - return Session{C.pn_session_next(s.pn, C.pn_state_t(state))} -} - -// Wrappers for declarations in link.h - -type SndSettleMode C.pn_snd_settle_mode_t - -const ( - PnSndUnsettled SndSettleMode = C.PN_SND_UNSETTLED - PnSndSettled SndSettleMode = C.PN_SND_SETTLED - PnSndMixed SndSettleMode = C.PN_SND_MIXED -) - -func (e SndSettleMode) String() string { - switch e { - - case C.PN_SND_UNSETTLED: - return "SndUnsettled" - case C.PN_SND_SETTLED: - return "SndSettled" - case C.PN_SND_MIXED: - return "SndMixed" - } - return "unknown" -} - -type RcvSettleMode C.pn_rcv_settle_mode_t - -const ( - PnRcvFirst RcvSettleMode = C.PN_RCV_FIRST - PnRcvSecond RcvSettleMode = C.PN_RCV_SECOND -) - -func (e RcvSettleMode) String() string { - switch e { - - case C.PN_RCV_FIRST: - return "RcvFirst" - case C.PN_RCV_SECOND: - return "RcvSecond" - } - return "unknown" -} - -type Link struct{ pn *C.pn_link_t } - -func (l Link) IsNil() bool { return l.pn == nil } -func (l Link) Free() { - C.pn_link_free(l.pn) -} -func (l Link) Name() string { - return C.GoString(C.pn_link_name(l.pn)) -} -func (l Link) IsSender() bool { - return bool(C.pn_link_is_sender(l.pn)) -} -func (l Link) IsReceiver() bool { - return bool(C.pn_link_is_receiver(l.pn)) -} -func (l Link) State() State { - return State(C.pn_link_state(l.pn)) -} -func (l Link) Error() error { - return internal.PnError(unsafe.Pointer(C.pn_link_error(l.pn))) -} -func (l Link) Condition() Condition { - return Condition{C.pn_link_condition(l.pn)} -} -func (l Link) RemoteCondition() Condition { - return Condition{C.pn_link_remote_condition(l.pn)} -} -func (l Link) Session() Session { - return Session{C.pn_link_session(l.pn)} -} -func (l Link) Next(state State) Link { - return Link{C.pn_link_next(l.pn, C.pn_state_t(state))} -} -func (l Link) Open() { - C.pn_link_open(l.pn) -} -func (l Link) Close() { - C.pn_link_close(l.pn) -} -func (l Link) Detach() { - C.pn_link_detach(l.pn) -} -func (l Link) Source() Terminus { - return Terminus{C.pn_link_source(l.pn)} -} -func (l Link) Target() Terminus { - return Terminus{C.pn_link_target(l.pn)} -} -func (l Link) RemoteSource() Terminus { - return Terminus{C.pn_link_remote_source(l.pn)} -} -func (l Link) RemoteTarget() Terminus { - return Terminus{C.pn_link_remote_target(l.pn)} -} -func (l Link) Current() Delivery { - return Delivery{C.pn_link_current(l.pn)} -} -func (l Link) Advance() bool { - return bool(C.pn_link_advance(l.pn)) -} -func (l Link) Credit() int { - return int(C.pn_link_credit(l.pn)) -} -func (l Link) Queued() int { - return int(C.pn_link_queued(l.pn)) -} -func (l Link) RemoteCredit() int { - return int(C.pn_link_remote_credit(l.pn)) -} -func (l Link) IsDrain() bool { - return bool(C.pn_link_get_drain(l.pn)) -} -func (l Link) Drained() int { - return int(C.pn_link_drained(l.pn)) -} -func (l Link) Available() int { - return int(C.pn_link_available(l.pn)) -} -func (l Link) SndSettleMode() SndSettleMode { - return SndSettleMode(C.pn_link_snd_settle_mode(l.pn)) -} -func (l Link) RcvSettleMode() RcvSettleMode { - return RcvSettleMode(C.pn_link_rcv_settle_mode(l.pn)) -} -func (l Link) SetSndSettleMode(mode SndSettleMode) { - C.pn_link_set_snd_settle_mode(l.pn, C.pn_snd_settle_mode_t(mode)) -} -func (l Link) SetRcvSettleMode(mode RcvSettleMode) { - C.pn_link_set_rcv_settle_mode(l.pn, C.pn_rcv_settle_mode_t(mode)) -} -func (l Link) RemoteSndSettleMode() SndSettleMode { - return SndSettleMode(C.pn_link_remote_snd_settle_mode(l.pn)) -} -func (l Link) RemoteRcvSettleMode() RcvSettleMode { - return RcvSettleMode(C.pn_link_remote_rcv_settle_mode(l.pn)) -} -func (l Link) Unsettled() int { - return int(C.pn_link_unsettled(l.pn)) -} -func (l Link) Offered(credit int) { - C.pn_link_offered(l.pn, C.int(credit)) -} -func (l Link) Flow(credit int) { - C.pn_link_flow(l.pn, C.int(credit)) -} -func (l Link) Drain(credit int) { - C.pn_link_drain(l.pn, C.int(credit)) -} -func (l Link) SetDrain(drain bool) { - C.pn_link_set_drain(l.pn, C.bool(drain)) -} -func (l Link) Draining() bool { - return bool(C.pn_link_draining(l.pn)) -} - -// Wrappers for declarations in delivery.h - -type Delivery struct{ pn *C.pn_delivery_t } - -func (d Delivery) IsNil() bool { return d.pn == nil } -func (d Delivery) Tag() DeliveryTag { - return DeliveryTag{C.pn_delivery_tag(d.pn)} -} -func (d Delivery) Link() Link { - return Link{C.pn_delivery_link(d.pn)} -} -func (d Delivery) Local() Disposition { - return Disposition{C.pn_delivery_local(d.pn)} -} -func (d Delivery) LocalState() uint64 { - return uint64(C.pn_delivery_local_state(d.pn)) -} -func (d Delivery) Remote() Disposition { - return Disposition{C.pn_delivery_remote(d.pn)} -} -func (d Delivery) RemoteState() uint64 { - return uint64(C.pn_delivery_remote_state(d.pn)) -} -func (d Delivery) Settled() bool { - return bool(C.pn_delivery_settled(d.pn)) -} -func (d Delivery) Pending() uint { - return uint(C.pn_delivery_pending(d.pn)) -} -func (d Delivery) Partial() bool { - return bool(C.pn_delivery_partial(d.pn)) -} -func (d Delivery) Writable() bool { - return bool(C.pn_delivery_writable(d.pn)) -} -func (d Delivery) Readable() bool { - return bool(C.pn_delivery_readable(d.pn)) -} -func (d Delivery) Updated() bool { - return bool(C.pn_delivery_updated(d.pn)) -} -func (d Delivery) Update(state uint64) { - C.pn_delivery_update(d.pn, C.uint64_t(state)) -} -func (d Delivery) Clear() { - C.pn_delivery_clear(d.pn) -} -func (d Delivery) Current() bool { - return bool(C.pn_delivery_current(d.pn)) -} -func (d Delivery) Settle() { - C.pn_delivery_settle(d.pn) -} -func (d Delivery) Dump() { - C.pn_delivery_dump(d.pn) -} -func (d Delivery) Buffered() bool { - return bool(C.pn_delivery_buffered(d.pn)) -} - -// Wrappers for declarations in disposition.h - -type Disposition struct{ pn *C.pn_disposition_t } - -func (d Disposition) IsNil() bool { return d.pn == nil } -func (d Disposition) Type() uint64 { - return uint64(C.pn_disposition_type(d.pn)) -} -func (d Disposition) Condition() Condition { - return Condition{C.pn_disposition_condition(d.pn)} -} -func (d Disposition) Data() Data { - return Data{C.pn_disposition_data(d.pn)} -} -func (d Disposition) SectionNumber() uint16 { - return uint16(C.pn_disposition_get_section_number(d.pn)) -} -func (d Disposition) SetSectionNumber(section_number uint16) { - C.pn_disposition_set_section_number(d.pn, C.uint32_t(section_number)) -} -func (d Disposition) SectionOffset() uint64 { - return uint64(C.pn_disposition_get_section_offset(d.pn)) -} -func (d Disposition) SetSectionOffset(section_offset uint64) { - C.pn_disposition_set_section_offset(d.pn, C.uint64_t(section_offset)) -} -func (d Disposition) IsFailed() bool { - return bool(C.pn_disposition_is_failed(d.pn)) -} -func (d Disposition) SetFailed(failed bool) { - C.pn_disposition_set_failed(d.pn, C.bool(failed)) -} -func (d Disposition) IsUndeliverable() bool { - return bool(C.pn_disposition_is_undeliverable(d.pn)) -} -func (d Disposition) SetUndeliverable(undeliverable bool) { - C.pn_disposition_set_undeliverable(d.pn, C.bool(undeliverable)) -} -func (d Disposition) Annotations() Data { - return Data{C.pn_disposition_annotations(d.pn)} -} - -// Wrappers for declarations in condition.h - -type Condition struct{ pn *C.pn_condition_t } - -func (c Condition) IsNil() bool { return c.pn == nil } -func (c Condition) IsSet() bool { - return bool(C.pn_condition_is_set(c.pn)) -} -func (c Condition) Clear() { - C.pn_condition_clear(c.pn) -} -func (c Condition) Name() string { - return C.GoString(C.pn_condition_get_name(c.pn)) -} -func (c Condition) SetName(name string) int { - nameC := C.CString(name) - defer C.free(unsafe.Pointer(nameC)) - - return int(C.pn_condition_set_name(c.pn, nameC)) -} -func (c Condition) Description() string { - return C.GoString(C.pn_condition_get_description(c.pn)) -} -func (c Condition) SetDescription(description string) int { - descriptionC := C.CString(description) - defer C.free(unsafe.Pointer(descriptionC)) - - return int(C.pn_condition_set_description(c.pn, descriptionC)) -} -func (c Condition) Info() Data { - return Data{C.pn_condition_info(c.pn)} -} -func (c Condition) IsRedirect() bool { - return bool(C.pn_condition_is_redirect(c.pn)) -} -func (c Condition) RedirectHost() string { - return C.GoString(C.pn_condition_redirect_host(c.pn)) -} -func (c Condition) RedirectPort() int { - return int(C.pn_condition_redirect_port(c.pn)) -} - -// Wrappers for declarations in terminus.h - -type TerminusType C.pn_terminus_type_t - -const ( - PnUnspecified TerminusType = C.PN_UNSPECIFIED - PnSource TerminusType = C.PN_SOURCE - PnTarget TerminusType = C.PN_TARGET - PnCoordinator TerminusType = C.PN_COORDINATOR -) - -func (e TerminusType) String() string { - switch e { - - case C.PN_UNSPECIFIED: - return "Unspecified" - case C.PN_SOURCE: - return "Source" - case C.PN_TARGET: - return "Target" - case C.PN_COORDINATOR: - return "Coordinator" - } - return "unknown" -} - -type Durability C.pn_durability_t - -const ( - PnNondurable Durability = C.PN_NONDURABLE - PnConfiguration Durability = C.PN_CONFIGURATION - PnDeliveries Durability = C.PN_DELIVERIES -) - -func (e Durability) String() string { - switch e { - - case C.PN_NONDURABLE: - return "Nondurable" - case C.PN_CONFIGURATION: - return "Configuration" - case C.PN_DELIVERIES: - return "Deliveries" - } - return "unknown" -} - -type ExpiryPolicy C.pn_expiry_policy_t - -const ( - PnExpireWithLink ExpiryPolicy = C.PN_EXPIRE_WITH_LINK - PnExpireWithSession ExpiryPolicy = C.PN_EXPIRE_WITH_SESSION - PnExpireWithConnection ExpiryPolicy = C.PN_EXPIRE_WITH_CONNECTION - PnExpireNever ExpiryPolicy = C.PN_EXPIRE_NEVER -) - -func (e ExpiryPolicy) String() string { - switch e { - - case C.PN_EXPIRE_WITH_LINK: - return "ExpireWithLink" - case C.PN_EXPIRE_WITH_SESSION: - return "ExpireWithSession" - case C.PN_EXPIRE_WITH_CONNECTION: - return "ExpireWithConnection" - case C.PN_EXPIRE_NEVER: - return "ExpireNever" - } - return "unknown" -} - -type DistributionMode C.pn_distribution_mode_t - -const ( - PnDistModeUnspecified DistributionMode = C.PN_DIST_MODE_UNSPECIFIED - PnDistModeCopy DistributionMode = C.PN_DIST_MODE_COPY - PnDistModeMove DistributionMode = C.PN_DIST_MODE_MOVE -) - -func (e DistributionMode) String() string { - switch e { - - case C.PN_DIST_MODE_UNSPECIFIED: - return "DistModeUnspecified" - case C.PN_DIST_MODE_COPY: - return "DistModeCopy" - case C.PN_DIST_MODE_MOVE: - return "DistModeMove" - } - return "unknown" -} - -type Terminus struct{ pn *C.pn_terminus_t } - -func (t Terminus) IsNil() bool { return t.pn == nil } -func (t Terminus) Type() TerminusType { - return TerminusType(C.pn_terminus_get_type(t.pn)) -} -func (t Terminus) SetType(type_ TerminusType) int { - return int(C.pn_terminus_set_type(t.pn, C.pn_terminus_type_t(type_))) -} -func (t Terminus) Address() string { - return C.GoString(C.pn_terminus_get_address(t.pn)) -} -func (t Terminus) SetAddress(address string) int { - addressC := C.CString(address) - defer C.free(unsafe.Pointer(addressC)) - - return int(C.pn_terminus_set_address(t.pn, addressC)) -} -func (t Terminus) SetDistributionMode(mode DistributionMode) int { - return int(C.pn_terminus_set_distribution_mode(t.pn, C.pn_distribution_mode_t(mode))) -} -func (t Terminus) Durability() Durability { - return Durability(C.pn_terminus_get_durability(t.pn)) -} -func (t Terminus) SetDurability(durability Durability) int { - return int(C.pn_terminus_set_durability(t.pn, C.pn_durability_t(durability))) -} -func (t Terminus) ExpiryPolicy() ExpiryPolicy { - return ExpiryPolicy(C.pn_terminus_get_expiry_policy(t.pn)) -} -func (t Terminus) SetExpiryPolicy(policy ExpiryPolicy) int { - return int(C.pn_terminus_set_expiry_policy(t.pn, C.pn_expiry_policy_t(policy))) -} -func (t Terminus) Timeout() time.Duration { - return (time.Duration(C.pn_terminus_get_timeout(t.pn)) * time.Second) -} -func (t Terminus) SetTimeout(timeout time.Duration) int { - return int(C.pn_terminus_set_timeout(t.pn, C.pn_seconds_t(timeout))) -} -func (t Terminus) IsDynamic() bool { - return bool(C.pn_terminus_is_dynamic(t.pn)) -} -func (t Terminus) SetDynamic(dynamic bool) int { - return int(C.pn_terminus_set_dynamic(t.pn, C.bool(dynamic))) -} -func (t Terminus) Properties() Data { - return Data{C.pn_terminus_properties(t.pn)} -} -func (t Terminus) Capabilities() Data { - return Data{C.pn_terminus_capabilities(t.pn)} -} -func (t Terminus) Outcomes() Data { - return Data{C.pn_terminus_outcomes(t.pn)} -} -func (t Terminus) Filter() Data { - return Data{C.pn_terminus_filter(t.pn)} -} -func (t Terminus) Copy(src Terminus) int { - return int(C.pn_terminus_copy(t.pn, src.pn)) -} - -// Wrappers for declarations in connection.h - -type Connection struct{ pn *C.pn_connection_t } - -func (c Connection) IsNil() bool { return c.pn == nil } -func (c Connection) Free() { - C.pn_connection_free(c.pn) -} -func (c Connection) Release() { - C.pn_connection_release(c.pn) -} -func (c Connection) Error() error { - return internal.PnError(unsafe.Pointer(C.pn_connection_error(c.pn))) -} -func (c Connection) State() State { - return State(C.pn_connection_state(c.pn)) -} -func (c Connection) Open() { - C.pn_connection_open(c.pn) -} -func (c Connection) Close() { - C.pn_connection_close(c.pn) -} -func (c Connection) Reset() { - C.pn_connection_reset(c.pn) -} -func (c Connection) Condition() Condition { - return Condition{C.pn_connection_condition(c.pn)} -} -func (c Connection) RemoteCondition() Condition { - return Condition{C.pn_connection_remote_condition(c.pn)} -} -func (c Connection) Container() string { - return C.GoString(C.pn_connection_get_container(c.pn)) -} -func (c Connection) SetContainer(container string) { - containerC := C.CString(container) - defer C.free(unsafe.Pointer(containerC)) - - C.pn_connection_set_container(c.pn, containerC) -} -func (c Connection) SetUser(user string) { - userC := C.CString(user) - defer C.free(unsafe.Pointer(userC)) - - C.pn_connection_set_user(c.pn, userC) -} -func (c Connection) SetPassword(password string) { - passwordC := C.CString(password) - defer C.free(unsafe.Pointer(passwordC)) - - C.pn_connection_set_password(c.pn, passwordC) -} -func (c Connection) User() string { - return C.GoString(C.pn_connection_get_user(c.pn)) -} -func (c Connection) Hostname() string { - return C.GoString(C.pn_connection_get_hostname(c.pn)) -} -func (c Connection) SetHostname(hostname string) { - hostnameC := C.CString(hostname) - defer C.free(unsafe.Pointer(hostnameC)) - - C.pn_connection_set_hostname(c.pn, hostnameC) -} -func (c Connection) RemoteContainer() string { - return C.GoString(C.pn_connection_remote_container(c.pn)) -} -func (c Connection) RemoteHostname() string { - return C.GoString(C.pn_connection_remote_hostname(c.pn)) -} -func (c Connection) OfferedCapabilities() Data { - return Data{C.pn_connection_offered_capabilities(c.pn)} -} -func (c Connection) DesiredCapabilities() Data { - return Data{C.pn_connection_desired_capabilities(c.pn)} -} -func (c Connection) Properties() Data { - return Data{C.pn_connection_properties(c.pn)} -} -func (c Connection) RemoteOfferedCapabilities() Data { - return Data{C.pn_connection_remote_offered_capabilities(c.pn)} -} -func (c Connection) RemoteDesiredCapabilities() Data { - return Data{C.pn_connection_remote_desired_capabilities(c.pn)} -} -func (c Connection) RemoteProperties() Data { - return Data{C.pn_connection_remote_properties(c.pn)} -}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go deleted file mode 100644 index f3f3307..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go +++ /dev/null @@ -1,126 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// Internal implementation details - ignore. -package internal - -// #cgo LDFLAGS: -lqpid-proton -// #include <proton/error.h> -// #include <proton/codec.h> -import "C" - -import ( - "fmt" - "runtime" - "sync" - "unsafe" -) - -// Error type for all proton errors. -type Error string - -// Error prefixes error message with proton: -func (e Error) Error() string { - return "proton: " + string(e) -} - -// Errorf creates an Error with a formatted message -func Errorf(format string, a ...interface{}) Error { - return Error(fmt.Sprintf(format, a...)) -} - -type PnErrorCode int - -func (e PnErrorCode) String() string { - switch e { - case C.PN_EOS: - return "end-of-data" - case C.PN_ERR: - return "error" - case C.PN_OVERFLOW: - return "overflow" - case C.PN_UNDERFLOW: - return "underflow" - case C.PN_STATE_ERR: - return "bad-state" - case C.PN_ARG_ERR: - return "invalid-argument" - case C.PN_TIMEOUT: - return "timeout" - case C.PN_INTR: - return "interrupted" - case C.PN_INPROGRESS: - return "in-progress" - default: - return fmt.Sprintf("unknown-error(%d)", e) - } -} - -func PnError(p unsafe.Pointer) error { - e := (*C.pn_error_t)(p) - if e == nil || C.pn_error_code(e) == 0 { - return nil - } - return Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e))) -} - -// DoRecover is called to recover from internal panics -func DoRecover(err *error) { - r := recover() - switch r := r.(type) { - case nil: // We are not recovering - return - case runtime.Error: // Don't catch runtime.Error - panic(r) - case error: - *err = r - default: - panic(r) - } -} - -// panicIf panics if condition is true, the panic value is Errorf(fmt, args...) -func panicIf(condition bool, fmt string, args ...interface{}) { - if condition { - panic(Errorf(fmt, args...)) - } -} - -// FirstError is a goroutine-safe error holder that keeps the first error that is set. -type FirstError struct { - err error - lock sync.Mutex -} - -// Set the error if not already set, return the error. -func (e *FirstError) Set(err error) error { - e.lock.Lock() - defer e.lock.Unlock() - if e.err == nil { - e.err = err - } - return e.err -} - -// Get the error. -func (e *FirstError) Get() error { - e.lock.Lock() - defer e.lock.Unlock() - return e.err -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/doc.go deleted file mode 100644 index c815f4e..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/doc.go +++ /dev/null @@ -1,28 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/* -Package messaging provides a procedural, concurrent Go API for exchanging AMQP messages. -*/ -package messaging - -// #cgo LDFLAGS: -lqpid-proton -import "C" - -// Just for package comment http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/handler.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/handler.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/handler.go deleted file mode 100644 index 4a97b9d..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/handler.go +++ /dev/null @@ -1,70 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package messaging - -import ( - "qpid.apache.org/proton/go/amqp" - "qpid.apache.org/proton/go/event" -) - -// FIXME aconway 2015-04-28: cleanup - exposing delivery vs. disposition. - -type acksMap map[event.Delivery]chan Disposition -type receiverMap map[event.Link]chan amqp.Message - -type handler struct { - connection *Connection - acks acksMap - receivers receiverMap -} - -func newHandler(c *Connection) *handler { - return &handler{c, make(acksMap), make(receiverMap)} -} - -func (h *handler) HandleMessagingEvent(t event.MessagingEventType, e event.Event) error { - switch t { - // FIXME aconway 2015-04-29: handle errors. - case event.MConnectionClosed: - for _, ack := range h.acks { - // FIXME aconway 2015-04-29: communicate error info - close(ack) - } - - case event.MSettled: - ack := h.acks[e.Delivery()] - if ack != nil { - ack <- Disposition(e.Delivery().Remote().Type()) - close(ack) - delete(h.acks, e.Delivery()) - } - - case event.MMessage: - r := h.receivers[e.Link()] - if r != nil { - m, _ := event.DecodeMessage(e) - // FIXME aconway 2015-04-29: hack, direct send, possible blocking. - r <- m - } else { - // FIXME aconway 2015-04-29: Message with no receiver - log? panic? deadletter? drop? - } - } - return nil -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go deleted file mode 100644 index e4b117d..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go +++ /dev/null @@ -1,261 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package messaging - -// #include <proton/disposition.h> -import "C" - -import ( - "io" - "net" - "qpid.apache.org/proton/go/amqp" - "qpid.apache.org/proton/go/event" - "qpid.apache.org/proton/go/internal" -) - -// Closed is an alias for io.EOF. It indicates orderly closure of an endpoint. -var Closed = io.EOF - -// Connection is a connection to a remote AMQP endpoint. -// -// You can set exported fields to configure the connection before calling -// Connection.Open() -// -type Connection struct { - // Server = true means a the connection will do automatic protocol detection. - Server bool - - // FIXME aconway 2015-04-17: Other parameters to set up SSL, SASL etc. - - handler *handler - pump *event.Pump - session Session - err internal.FirstError -} - -// Error returns nil if the connection is open, messaging.Closed if was closed cleanly -// or an error value if it was closed due to an error. -func (c *Connection) Error() error { return c.err.Get() } - -// Make an AMQP connection over a net.Conn connection. -// You must call c.Close() to close the connection and clean up its resources. -func (c *Connection) Open(conn net.Conn) (err error) { - c.handler = newHandler(c) - c.pump, err = event.NewPump(conn, - event.NewMessagingDelegator(c.handler), - ) - if err == nil { - if c.Server { - c.pump.Server() - } - go c.pump.Run() - } - return c.err.Set(err) -} - -// Connect opens a default client connection. It is a shortcut for -// c := &Connection -// c.Open() -// -func Connect(conn net.Conn) (*Connection, error) { - c := &Connection{} - c.err.Set(c.Open(conn)) - return c, c.Error() -} - -// Close cleans up resources and closes the associated net.Conn connection. -func (c *Connection) Close() error { - err := c.pump.Close() // Will be nil on close OK - c.err.Set(c.pump.Error) // Will be io.EOF on close OK - return err -} - -// DefaultSession returns a default session for the connection. -// -// It is created on the first call to DefaultSession() and returned from all subsequent calls. -// Use Session() for more control over creating sessions. -// -func (c *Connection) DefaultSession() (s Session, err error) { - if c.Error() != nil { - return Session{}, c.Error() - } - if c.session.e.IsNil() { - c.session, err = c.Session() - } - return c.session, err -} - -type sessionErr struct { - s event.Session - err error -} - -// Session creates a new session. -func (c *Connection) Session() (Session, error) { - connection := c.pump.Connection() - result := make(chan sessionErr) - c.pump.Inject <- func() { - s, err := connection.Session() - if err == nil { - s.Open() - } - result <- sessionErr{s, err} - } - se := <-result - return Session{se.s, c.pump}, se.err -} - -// FIXME aconway 2015-04-27: set sender name, options etc. - -// Sender creates a Sender that will send messages to the address addr. -func (c *Connection) Sender(addr string) (s Sender, err error) { - session, err := c.DefaultSession() - if err != nil { - return Sender{}, err - } - result := make(chan Sender) - c.pump.Inject <- func() { - link := session.e.Sender(linkNames.Next()) - if link.IsNil() { - err = session.e.Error() - } else { - link.Target().SetAddress(addr) - // FIXME aconway 2015-04-27: link options? - link.Open() - } - result <- Sender{Link{c, link}} - } - return <-result, err -} - -// Receiver returns a receiver that will receive messages sent to address addr. -func (c *Connection) Receiver(addr string) (r Receiver, err error) { - // FIXME aconway 2015-04-29: move code to session, in link.go? - session, err := c.DefaultSession() - if err != nil { - return Receiver{}, err - } - result := make(chan Receiver) - c.pump.Inject <- func() { - link := session.e.Receiver(linkNames.Next()) - if link.IsNil() { - err = session.e.Error() - } else { - link.Source().SetAddress(addr) - // FIXME aconway 2015-04-27: link options? - link.Open() - } - // FIXME aconway 2015-04-29: hack to avoid blocking, need proper buffering linked to flow control - rchan := make(chan amqp.Message, 1000) - c.handler.receivers[link] = rchan - result <- Receiver{Link{c, link}, rchan} - } - return <-result, err -} - -// FIXME aconway 2015-04-29: counter per session. -var linkNames amqp.UidCounter - -// Session is an AMQP session, it contains Senders and Receivers. -// Every Connection has a DefaultSession, you can create additional sessions -// with Connection.Session() -type Session struct { - e event.Session - pump *event.Pump -} - -// FIXME aconway 2015-05-05: REWORK Sender/receiver/session. - -// Disposition indicates the outcome of a settled message delivery. -type Disposition uint64 - -const ( - // Message was accepted by the receiver - Accepted Disposition = C.PN_ACCEPTED - // Message was rejected as invalid by the receiver - Rejected = C.PN_REJECTED - // Message was not processed by the receiver but may be processed by some other receiver. - Released = C.PN_RELEASED -) - -// String human readable name for a Disposition. -func (d Disposition) String() string { - switch d { - case Accepted: - return "Accepted" - case Rejected: - return "Rejected" - case Released: - return "Released" - default: - return "Unknown" - } -} - -// FIXME aconway 2015-04-29: How to signal errors via ack channels. - -// An Acknowledgement is a channel which will receive the Disposition of the message -// when it is acknowledged. The channel is closed after the disposition is sent. -type Acknowledgement <-chan Disposition - -// Link has common data and methods for Sender and Receiver links. -type Link struct { - connection *Connection - elink event.Link -} - -// Sender sends messages. -type Sender struct { - Link -} - -// FIXME aconway 2015-04-28: allow user to specify delivery tag. -// FIXME aconway 2015-04-28: should we provide a sending channel rather than a send function? - -// Send sends a message. If d is not nil, the disposition is retured on d. -// If d is nil the message is sent pre-settled and no disposition is returned. -func (s *Sender) Send(m amqp.Message) (ack Acknowledgement, err error) { - ackChan := make(chan Disposition, 1) - ack = ackChan - s.connection.pump.Inject <- func() { - // FIXME aconway 2015-04-28: flow control & credit, buffer or fail? - delivery, err := s.elink.Send(m) - if err == nil { // FIXME aconway 2015-04-28: error handling - s.connection.handler.acks[delivery] = ackChan - } - } - return ack, nil -} - -// Close the sender. -func (s *Sender) Close() error { return nil } // FIXME aconway 2015-04-27: close/free - -type Receiver struct { - Link - // Channel to receive messages. When it closes, check Receiver.Error() for an error. - Receive <-chan amqp.Message -} - -// FIXME aconway 2015-04-29: settlement - ReceivedMessage with Settle() method? - -// FIXME aconway 2015-05-25: Close must unblock Receive() calls. - -// Close the Receiver. -func (r *Receiver) Close() error { return nil } // FIXME aconway 2015-04-29: close/free http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go new file mode 100644 index 0000000..aa4d76b --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go @@ -0,0 +1,399 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package proton + +// #include <proton/handlers.h> +import "C" + +import ( + "qpid.apache.org/proton/internal" +) + +// EventHandler handles core proton events. +type EventHandler interface { + // HandleEvent is called with an event. + // Typically HandleEvent() is implemented as a switch on e.Type() + // Returning an error will stop the Engine. + HandleEvent(e Event) +} + +// cHandler wraps a C pn_handler_t +type cHandler struct { + pn *C.pn_handler_t +} + +func (h cHandler) HandleEvent(e Event) { + C.pn_handler_dispatch(h.pn, e.pn, C.pn_event_type(e.pn)) +} + +// MessagingHandler provides an alternative interface to EventHandler. +// it is easier to use for most applications that send and receive messages. +// +// Implement this interface and then wrap your value with a MessagingHandlerDelegator. +// MessagingHandlerDelegator implements EventHandler and can be registered with a Engine. +// +type MessagingHandler interface { + // HandleMessagingEvent is called with MessagingEvent. + // Typically HandleEvent() is implemented as a switch on e.Type() + // Returning an error will stop the Engine. + HandleMessagingEvent(MessagingEvent, Event) +} + +// MessagingEvent provides a set of events that are easier to work with than the +// core events defined by EventType +// +// There are 3 types of "endpoint": Connection, Session and Link. For each +// endpoint there are 5 events: Opening, Opened, Closing, Closed and Error. +// +// The meaning of these events is as follows: +// +// Opening: The remote end opened, the local end will open automatically. +// +// Opened: Both ends are open, regardless of which end opened first. +// +// Closing: The remote end closed without error, the local end will close automatically. +// +// Error: The remote end closed with an error, the local end will close automatically. +// +// Closed: Both ends are closed, regardless of which end closed first or if there was an error. +// No further events will be received for the endpoint. +// +type MessagingEvent int + +const ( + // The event loop starts. + MStart MessagingEvent = iota + // The peer closes the connection with an error condition. + MConnectionError + // The peer closes the session with an error condition. + MSessionError + // The peer closes the link with an error condition. + MLinkError + // The peer Initiates the opening of the connection. + MConnectionOpening + // The peer initiates the opening of the session. + MSessionOpening + // The peer initiates the opening of the link. + MLinkOpening + // The connection is opened. + MConnectionOpened + // The session is opened. + MSessionOpened + // The link is opened. + MLinkOpened + // The peer initiates the closing of the connection. + MConnectionClosing + // The peer initiates the closing of the session. + MSessionClosing + // The peer initiates the closing of the link. + MLinkClosing + // Both ends of the connection are closed. + MConnectionClosed + // Both ends of the session are closed. + MSessionClosed + // Both ends of the link are closed. + MLinkClosed + // The sender link has credit and messages can + // therefore be transferred. + MSendable + // The remote peer accepts an outgoing message. + MAccepted + // The remote peer rejects an outgoing message. + MRejected + // The peer releases an outgoing message. Note that this may be in response to + // either the RELEASE or MODIFIED state as defined by the AMQP specification. + MReleased + // The peer has settled the outgoing message. This is the point at which it + // should never be re-transmitted. + MSettled + // A message is received. Call Event.Delivery().Message() to decode as an amqp.Message. + // To manage the outcome of this messages (e.g. to accept or reject the message) + // use Event.Delivery(). + MMessage + // A network connection was disconnected. + MDisconnected +) + +func (t MessagingEvent) String() string { + switch t { + case MStart: + return "Start" + case MConnectionError: + return "ConnectionError" + case MSessionError: + return "SessionError" + case MLinkError: + return "LinkError" + case MConnectionOpening: + return "ConnectionOpening" + case MSessionOpening: + return "SessionOpening" + case MLinkOpening: + return "LinkOpening" + case MConnectionOpened: + return "ConnectionOpened" + case MSessionOpened: + return "SessionOpened" + case MLinkOpened: + return "LinkOpened" + case MConnectionClosing: + return "ConnectionClosing" + case MSessionClosing: + return "SessionClosing" + case MLinkClosing: + return "LinkClosing" + case MConnectionClosed: + return "ConnectionClosed" + case MSessionClosed: + return "SessionClosed" + case MLinkClosed: + return "LinkClosed" + case MDisconnected: + return "MDisconnected" + case MSendable: + return "Sendable" + case MAccepted: + return "Accepted" + case MRejected: + return "Rejected" + case MReleased: + return "Released" + case MSettled: + return "Settled" + case MMessage: + return "Message" + default: + return "Unknown" + } +} + +// ResourceHandler provides a simple way to track the creation and deletion of +// various proton objects. +// endpointDelegator captures common patterns for endpoints opening/closing +type endpointDelegator struct { + remoteOpen, remoteClose, localOpen, localClose EventType + opening, opened, closing, closed, error MessagingEvent + endpoint func(Event) Endpoint + delegator *MessagingDelegator +} + +// HandleEvent handles an open/close event for an endpoint in a generic way. +func (d endpointDelegator) HandleEvent(e Event) { + endpoint := d.endpoint(e) + state := endpoint.State() + + switch e.Type() { + + case d.localOpen: + if state.RemoteActive() { + d.delegator.mhandler.HandleMessagingEvent(d.opened, e) + } + + case d.remoteOpen: + switch { + case state.LocalActive(): + d.delegator.mhandler.HandleMessagingEvent(d.opened, e) + case state.LocalUninit(): + d.delegator.mhandler.HandleMessagingEvent(d.opening, e) + if d.delegator.AutoOpen { + endpoint.Open() + } + } + + case d.remoteClose: + if endpoint.RemoteCondition().IsSet() { // Closed with error + d.delegator.mhandler.HandleMessagingEvent(d.error, e) + } else { + d.delegator.mhandler.HandleMessagingEvent(d.closing, e) + } + if state.LocalClosed() { + d.delegator.mhandler.HandleMessagingEvent(d.closed, e) + } else if state.LocalActive() { + endpoint.Close() + } + + case d.localClose: + if state.RemoteClosed() { + d.delegator.mhandler.HandleMessagingEvent(d.closed, e) + } + + default: + // We shouldn't be called with any other event type. + panic(internal.Errorf("internal error, not an open/close event: %s", e)) + } +} + +// MessagingDelegator implments a EventHandler and delegates to a MessagingHandler. +// You can modify the exported fields before you pass the MessagingDelegator to +// a Engine. +type MessagingDelegator struct { + mhandler MessagingHandler + connection, session, link endpointDelegator + flowcontroller EventHandler + + // AutoSettle (default true) automatically pre-settle outgoing messages. + AutoSettle bool + // AutoAccept (default true) automatically accept and settle incoming messages + // if they are not settled by the delegate. + AutoAccept bool + // AutoOpen (default true) automatically open remotely opened endpoints. + AutoOpen bool + // Prefetch (default 10) initial credit to issue for incoming links. + Prefetch int + // PeerCloseIsError (default false) if true a close by the peer will be treated as an error. + PeerCloseError bool +} + +func NewMessagingDelegator(h MessagingHandler) *MessagingDelegator { + return &MessagingDelegator{ + mhandler: h, + flowcontroller: nil, + AutoSettle: true, + AutoAccept: true, + AutoOpen: true, + Prefetch: 10, + PeerCloseError: false, + } +} + +func handleIf(h EventHandler, e Event) { + if h != nil { + h.HandleEvent(e) + } +} + +// Handle a proton event by passing the corresponding MessagingEvent(s) to +// the MessagingHandler. +func (d *MessagingDelegator) HandleEvent(e Event) { + handleIf(d.flowcontroller, e) + + switch e.Type() { + + case EConnectionInit: + d.connection = endpointDelegator{ + EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose, + MConnectionOpening, MConnectionOpened, MConnectionClosing, MConnectionClosed, + MConnectionError, + func(e Event) Endpoint { return e.Connection() }, + d, + } + d.session = endpointDelegator{ + ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose, + MSessionOpening, MSessionOpened, MSessionClosing, MSessionClosed, + MSessionError, + func(e Event) Endpoint { return e.Session() }, + d, + } + d.link = endpointDelegator{ + ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose, + MLinkOpening, MLinkOpened, MLinkClosing, MLinkClosed, + MLinkError, + func(e Event) Endpoint { return e.Link() }, + d, + } + if d.Prefetch > 0 { + d.flowcontroller = cHandler{C.pn_flowcontroller(C.int(d.Prefetch))} + } + d.mhandler.HandleMessagingEvent(MStart, e) + + case EConnectionRemoteOpen: + + d.connection.HandleEvent(e) + + case EConnectionRemoteClose: + d.connection.HandleEvent(e) + e.Connection().Transport().CloseTail() + + case EConnectionLocalOpen, EConnectionLocalClose: + d.connection.HandleEvent(e) + + case ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose: + d.session.HandleEvent(e) + + case ELinkRemoteOpen: + e.Link().Source().Copy(e.Link().RemoteSource()) + e.Link().Target().Copy(e.Link().RemoteTarget()) + d.link.HandleEvent(e) + + case ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose: + d.link.HandleEvent(e) + + case ELinkFlow: + if e.Link().IsSender() && e.Link().Credit() > 0 { + d.mhandler.HandleMessagingEvent(MSendable, e) + } + + case EDelivery: + if e.Delivery().Link().IsReceiver() { + d.incoming(e) + } else { + d.outgoing(e) + } + + case ETransportClosed: + d.mhandler.HandleMessagingEvent(MDisconnected, e) + } +} + +func (d *MessagingDelegator) incoming(e Event) (err error) { + delivery := e.Delivery() + if delivery.HasMessage() { + if e.Link().State().LocalClosed() { + e.Link().Advance() + if d.AutoAccept { + delivery.Release(false) + } + } else { + d.mhandler.HandleMessagingEvent(MMessage, e) + if d.AutoAccept && !delivery.Settled() { + if err == nil { + delivery.Accept() + } else { + delivery.Reject() + } + } + } + } else if delivery.Updated() && delivery.Settled() { + d.mhandler.HandleMessagingEvent(MSettled, e) + } + return +} + +func (d *MessagingDelegator) outgoing(e Event) (err error) { + delivery := e.Delivery() + if delivery.Updated() { + switch delivery.Remote().Type() { + case Accepted: + d.mhandler.HandleMessagingEvent(MAccepted, e) + case Rejected: + d.mhandler.HandleMessagingEvent(MRejected, e) + case Released, Modified: + d.mhandler.HandleMessagingEvent(MReleased, e) + } + if err == nil && delivery.Settled() { + // The delivery was settled remotely, inform the local end. + d.mhandler.HandleMessagingEvent(MSettled, e) + } + if err == nil && d.AutoSettle { + delivery.Settle() // Local settle, don't mhandler MSettled till the remote end settles. + } + } + return +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go new file mode 100644 index 0000000..9f65e04 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go @@ -0,0 +1,121 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Internal implementation details - ignore. +package internal + +// #cgo LDFLAGS: -lqpid-proton +// #include <proton/error.h> +// #include <proton/codec.h> +import "C" + +import ( + "fmt" + "sync" + "unsafe" +) + +// Error type for proton runtime errors returned as error values. +type Error string + +// Error prefixes error message with proton: +func (e Error) Error() string { + return "proton: " + string(e) +} + +// Errorf creates an Error with a formatted message +func Errorf(format string, a ...interface{}) Error { + return Error(fmt.Sprintf(format, a...)) +} + +type PnErrorCode int + +func (e PnErrorCode) String() string { + switch e { + case C.PN_EOS: + return "end-of-data" + case C.PN_ERR: + return "error" + case C.PN_OVERFLOW: + return "overflow" + case C.PN_UNDERFLOW: + return "underflow" + case C.PN_STATE_ERR: + return "bad-state" + case C.PN_ARG_ERR: + return "invalid-argument" + case C.PN_TIMEOUT: + return "timeout" + case C.PN_INTR: + return "interrupted" + case C.PN_INPROGRESS: + return "in-progress" + default: + return fmt.Sprintf("unknown-error(%d)", e) + } +} + +func PnError(p unsafe.Pointer) error { + e := (*C.pn_error_t)(p) + if e == nil || C.pn_error_code(e) == 0 { + return nil + } + return Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e))) +} + +// panicIf panics if condition is true, the panic value is Errorf(fmt, args...) +func panicIf(condition bool, fmt string, args ...interface{}) { + if condition { + panic(Errorf(fmt, args...)) + } +} + +// FirstError is a goroutine-safe error holder that keeps the first error that is set. +type FirstError struct { + err error + lock sync.Mutex +} + +// Set the error if not already set, return the error. +func (e *FirstError) Set(err error) error { + e.lock.Lock() + defer e.lock.Unlock() + if e.err == nil { + e.err = err + } + return e.err +} + +// Get the error. +func (e *FirstError) Get() error { + e.lock.Lock() + defer e.lock.Unlock() + return e.err +} + +// Assert panics if condition is false with optional formatted message +func Assert(condition bool, format ...interface{}) { + if !condition { + if len(format) > 0 { + panic(Errorf(format[0].(string), format[1:]...)) + } else { + panic(Errorf("assertion failed")) + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel.go b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel.go new file mode 100644 index 0000000..77b524c --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel.go @@ -0,0 +1,82 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package internal + +// FlexChannel acts like a channel with an automatically sized buffer, see NewFlexChannel(). +type FlexChannel struct { + // In channel to send to. close(In) will close the FlexChannel once buffer has drained. + In chan<- interface{} + // Out channel to receive from. Out closes when In has closed and the buffer is empty. + Out <-chan interface{} + + in, out chan interface{} + buffer []interface{} + limit int +} + +// NewFlexChannel creates a FlexChannel, a channel with an automatically-sized buffer. +// +// Initially the buffer size is 0, the buffer grows as needed up to limit. limit < 0 means +// there is no limit. +// +func NewFlexChannel(limit int) *FlexChannel { + fc := &FlexChannel{ + in: make(chan interface{}), + out: make(chan interface{}), + buffer: make([]interface{}, 0), + limit: limit, + } + fc.In = fc.in + fc.Out = fc.out + go fc.run() + return fc +} + +func (fc *FlexChannel) run() { + defer func() { // Flush the channel on exit + for _, data := range fc.buffer { + fc.out <- data + } + close(fc.out) + }() + + for { + var usein, useout chan interface{} + var outvalue interface{} + if len(fc.buffer) > 0 { + useout = fc.out + outvalue = fc.buffer[0] + } + if len(fc.buffer) < fc.limit || fc.limit < 0 { + usein = fc.in + } + Assert(usein != nil || useout != nil) + select { + case useout <- outvalue: + fc.buffer = fc.buffer[1:] + case data, ok := <-usein: + if ok { + fc.buffer = append(fc.buffer, data) + } else { + return + } + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel_test.go new file mode 100644 index 0000000..d0e1a44 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel_test.go @@ -0,0 +1,89 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package internal + +import ( + "testing" +) + +func recvall(ch <-chan interface{}) (result []interface{}) { + for { + select { + case x := <-ch: + result = append(result, x) + default: + return + } + } +} + +func sendall(data []interface{}, ch chan<- interface{}) { +} + +func TestFlex(t *testing.T) { + fc := NewFlexChannel(5) + + // Test send/receve + go func() { + for i := 0; i < 4; i++ { + fc.In <- i + } + }() + + for i := 0; i < 4; i++ { + j := <-fc.Out + if i != j { + t.Error("%v != %v", i, j) + } + } + select { + case x, ok := <-fc.Out: + t.Error("receive empty channel got", x, ok) + default: + } + + // Test buffer limit + for i := 10; i < 15; i++ { + fc.In <- i + } + select { + case fc.In <- 0: + t.Error("send to full channel did not block") + default: + } + i := <-fc.Out + if i != 10 { + t.Error("%v != %v", i, 10) + } + fc.In <- 15 + close(fc.In) + + for i := 11; i < 16; i++ { + j := <-fc.Out + if i != j { + t.Error("%v != %v", i, j) + } + } + + x, ok := <-fc.Out + if ok { + t.Error("Unexpected value on Out", x) + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/internal/safemap.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/safemap.go b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/safemap.go new file mode 100644 index 0000000..3a1fe2b --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/safemap.go @@ -0,0 +1,57 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package internal + +import ( + "sync" +) + +// SafeMap is a goroutine-safe map of interface{} to interface{}. +type SafeMap struct { + m map[interface{}]interface{} + lock sync.Mutex +} + +func MakeSafeMap() SafeMap { return SafeMap{m: make(map[interface{}]interface{})} } + +func (m *SafeMap) Get(key interface{}) interface{} { + m.lock.Lock() + defer m.lock.Unlock() + return m.m[key] +} + +func (m *SafeMap) GetOk(key interface{}) (interface{}, bool) { + m.lock.Lock() + defer m.lock.Unlock() + v, ok := m.m[key] + return v, ok +} + +func (m *SafeMap) Put(key, value interface{}) { + m.lock.Lock() + defer m.lock.Unlock() + m.m[key] = value +} + +func (m *SafeMap) Delete(key interface{}) { + m.lock.Lock() + defer m.lock.Unlock() + delete(m.m, key) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/internal/uuid.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/uuid.go b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/uuid.go new file mode 100644 index 0000000..ef941a1 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/uuid.go @@ -0,0 +1,70 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package internal + +import ( + "fmt" + "math/rand" + "strconv" + "sync" + "sync/atomic" + "time" +) + +type UUID [16]byte + +func (u UUID) String() string { + return fmt.Sprintf("%X-%X-%X-%X-%X", u[0:4], u[4:6], u[6:8], u[8:10], u[10:]) +} + +// Don't mess with the default random source. +var randomSource = rand.NewSource(time.Now().UnixNano()) +var randomLock sync.Mutex + +func random() byte { + randomLock.Lock() + defer randomLock.Unlock() + return byte(randomSource.Int63()) +} + +func UUID4() UUID { + var u UUID + for i := 0; i < len(u); i++ { + u[i] = random() + } + // See /https://tools.ietf.org/html/rfc4122#section-4.4 + u[6] = (u[6] & 0x0F) | 0x40 // Version bits to 4 + u[8] = (u[8] & 0x3F) | 0x80 // Reserved bits (top two) set to 01 + return u +} + +// A simple atomic counter to generate unique 64 bit IDs. +type IdCounter struct{ count uint64 } + +// NextInt gets the next uint64 value from the atomic counter. +func (uc *IdCounter) NextInt() uint64 { + return atomic.AddUint64(&uc.count, 1) +} + +// Next gets the next integer value encoded as a base32 string, safe for NUL +// terminated C strings. +func (uc *IdCounter) Next() string { + return strconv.FormatUint(uc.NextInt(), 32) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/message.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/message.go new file mode 100644 index 0000000..a4370ff --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/message.go @@ -0,0 +1,79 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package proton + +// #include <proton/types.h> +// #include <proton/message.h> +// #include <proton/codec.h> +import "C" + +import ( + "qpid.apache.org/proton/amqp" + "qpid.apache.org/proton/internal" +) + +// HasMessage is true if all message data is available. +// Equivalent to !d.isNil && d.Readable() && !d.Partial() +func (d Delivery) HasMessage() bool { return !d.IsNil() && d.Readable() && !d.Partial() } + +// Message decodes the message containined in a delivery. +// Will return an error if delivery.HasMessage() is false. +func (delivery Delivery) Message() (m amqp.Message, err error) { + if !delivery.Readable() || delivery.Partial() { + return nil, internal.Errorf("attempting to get incomplete message") + } + data := make([]byte, delivery.Pending()) + result := delivery.Link().Recv(data) + if result != len(data) { + return nil, internal.Errorf("cannot receive message: %s", internal.PnErrorCode(result)) + } + m = amqp.NewMessage() + err = m.Decode(data) + return +} + +// TODO aconway 2015-04-08: proper handling of delivery tags. Tag counter per link. +var tags internal.IdCounter + +// Send sends a amqp.Message over a Link. +// Returns a Delivery that can be use to determine the outcome of the message. +func (link Link) Send(m amqp.Message) (Delivery, error) { + if !link.IsSender() { + return Delivery{}, internal.Errorf("attempt to send message on receiving link") + } + delivery := link.Delivery(tags.Next()) + bytes, err := m.Encode(nil) + if err != nil { + return Delivery{}, internal.Errorf("cannot send mesage %s", err) + } + result := link.SendBytes(bytes) + link.Advance() + if result != len(bytes) { + if result < 0 { + return delivery, internal.Errorf("send failed %v", internal.PnErrorCode(result)) + } else { + return delivery, internal.Errorf("send incomplete %v of %v", result, len(bytes)) + } + } + if link.RemoteSndSettleMode() == SndSettled { + delivery.Settle() + } + return delivery, nil +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go new file mode 100644 index 0000000..4e208f7 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go @@ -0,0 +1,335 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// This file contains special-case wrapper functions or wrappers that don't follow +// the pattern of genwrap.go. + +package proton + +//#include <proton/codec.h> +//#include <proton/connection.h> +//#include <proton/session.h> +//#include <proton/session.h> +//#include <proton/delivery.h> +//#include <proton/link.h> +//#include <proton/event.h> +//#include <proton/transport.h> +//#include <proton/link.h> +//#include <stdlib.h> +import "C" + +import ( + "fmt" + "qpid.apache.org/proton/amqp" + "qpid.apache.org/proton/internal" + "reflect" + "time" + "unsafe" +) + +// TODO aconway 2015-05-05: Documentation for generated types. + +// Event is an AMQP protocol event. +type Event struct { + pn *C.pn_event_t + eventType EventType + connection Connection + session Session + link Link + delivery Delivery +} + +func makeEvent(pn *C.pn_event_t) Event { + return Event{ + pn: pn, + eventType: EventType(C.pn_event_type(pn)), + connection: Connection{C.pn_event_connection(pn)}, + session: Session{C.pn_event_session(pn)}, + link: Link{C.pn_event_link(pn)}, + delivery: Delivery{C.pn_event_delivery(pn)}, + } +} +func (e Event) IsNil() bool { return e.eventType == EventType(0) } +func (e Event) Type() EventType { return e.eventType } +func (e Event) Connection() Connection { return e.connection } +func (e Event) Session() Session { return e.session } +func (e Event) Link() Link { return e.link } +func (e Event) Delivery() Delivery { return e.delivery } +func (e Event) String() string { return e.Type().String() } + +// Data holds a pointer to decoded AMQP data. +// Use amqp.marshal/unmarshal to access it as Go data types. +// +type Data struct{ pn *C.pn_data_t } + +func NewData(p unsafe.Pointer) Data { return Data{(*C.pn_data_t)(p)} } + +func (d Data) Free() { C.pn_data_free(d.pn) } +func (d Data) Pointer() unsafe.Pointer { return unsafe.Pointer(d.pn) } +func (d Data) Clear() { C.pn_data_clear(d.pn) } +func (d Data) Rewind() { C.pn_data_rewind(d.pn) } +func (d Data) Error() error { + return internal.PnError(unsafe.Pointer(C.pn_data_error(d.pn))) +} + +// State holds the state flags for an AMQP endpoint. +type State byte + +const ( + SLocalUninit State = C.PN_LOCAL_UNINIT + SLocalActive = C.PN_LOCAL_ACTIVE + SLocalClosed = C.PN_LOCAL_CLOSED + SRemoteUninit = C.PN_REMOTE_UNINIT + SRemoteActive = C.PN_REMOTE_ACTIVE + SRemoteClosed = C.PN_REMOTE_CLOSED +) + +// Has is True if bits & state is non 0. +func (s State) Has(bits State) bool { return s&bits != 0 } + +func (s State) LocalUninit() bool { return s.Has(SLocalUninit) } +func (s State) LocalActive() bool { return s.Has(SLocalActive) } +func (s State) LocalClosed() bool { return s.Has(SLocalClosed) } +func (s State) RemoteUninit() bool { return s.Has(SRemoteUninit) } +func (s State) RemoteActive() bool { return s.Has(SRemoteActive) } +func (s State) RemoteClosed() bool { return s.Has(SRemoteClosed) } + +// Return a State containig just the local flags +func (s State) Local() State { return State(s & C.PN_LOCAL_MASK) } + +// Return a State containig just the remote flags +func (s State) Remote() State { return State(s & C.PN_REMOTE_MASK) } + +// Endpoint is the common interface for Connection, Link and Session. +type Endpoint interface { + // State is the open/closed state. + State() State + // Open an endpoint. + Open() + // Close an endpoint. + Close() + // Condition holds a local error condition. + Condition() Condition + // RemoteCondition holds a remote error condition. + RemoteCondition() Condition + // Human readable name + String() string +} + +// CloseError sets an error condition on an endpoint and closes the endpoint. +func CloseError(e Endpoint, err error) { + e.Condition().SetError(err) + e.Close() +} + +const ( + Received uint64 = C.PN_RECEIVED + Accepted = C.PN_ACCEPTED + Rejected = C.PN_REJECTED + Released = C.PN_RELEASED + Modified = C.PN_MODIFIED +) + +// SettleAs is equivalent to d.Update(disposition); d.Settle() +func (d Delivery) SettleAs(disposition uint64) { + d.Update(disposition) + d.Settle() +} + +// Accept accepts and settles a delivery. +func (d Delivery) Accept() { d.SettleAs(Accepted) } + +// Reject rejects and settles a delivery +func (d Delivery) Reject() { d.SettleAs(Rejected) } + +// Release releases and settles a delivery +// If delivered is true the delivery count for the message will be increased. +func (d Delivery) Release(delivered bool) { + if delivered { + d.SettleAs(Modified) + } else { + d.SettleAs(Released) + } +} + +type DeliveryTag struct{ pn C.pn_delivery_tag_t } + +func (t DeliveryTag) String() string { return C.GoStringN(t.pn.start, C.int(t.pn.size)) } + +func (l Link) Recv(buf []byte) int { + if len(buf) == 0 { + return 0 + } + return int(C.pn_link_recv(l.pn, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) +} + +func (l Link) SendBytes(bytes []byte) int { + return int(C.pn_link_send(l.pn, cPtr(bytes), cLen(bytes))) +} + +func pnTag(tag string) C.pn_delivery_tag_t { + bytes := []byte(tag) + return C.pn_dtag(cPtr(bytes), cLen(bytes)) +} + +func (l Link) Delivery(tag string) Delivery { + return Delivery{C.pn_delivery(l.pn, pnTag(tag))} +} + +func cPtr(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} + +func cLen(b []byte) C.size_t { + return C.size_t(len(b)) +} + +func (s Session) Sender(name string) Link { + cname := C.CString(name) + defer C.free(unsafe.Pointer(cname)) + return Link{C.pn_sender(s.pn, cname)} +} + +func (s Session) Receiver(name string) Link { + cname := C.CString(name) + defer C.free(unsafe.Pointer(cname)) + return Link{C.pn_receiver(s.pn, cname)} +} + +// Context information per connection. +type connectionContext struct { + injecter Injecter + str string +} + +var connectionContexts = internal.MakeSafeMap() + +// Injecter for event-loop associated with this connection. +func (c Connection) Injecter() Injecter { + if cc, ok := connectionContexts.Get(c).(connectionContext); ok { + return cc.injecter + } + return nil +} + +// Unique (per process) string identifier for a connection, useful for debugging. +func (c Connection) String() string { + if cc, ok := connectionContexts.Get(c).(connectionContext); ok { + return cc.str + } + return fmt.Sprintf("%x", c.pn) +} + +// Head functions don't follow the normal naming conventions so missed by the generator. + +func (c Connection) LinkHead(s State) Link { + return Link{C.pn_link_head(c.pn, C.pn_state_t(s))} +} + +func (c Connection) SessionHead(s State) Session { + return Session{C.pn_session_head(c.pn, C.pn_state_t(s))} +} + +func (c Connection) Links(state State) (links []Link) { + for l := c.LinkHead(state); !l.IsNil(); l = l.Next(state) { + links = append(links, l) + } + return +} + +func (c Connection) Sessions(state State) (sessions []Session) { + for s := c.SessionHead(state); !s.IsNil(); s = s.Next(state) { + sessions = append(sessions, s) + } + return +} + +func (s Session) String() string { + return fmt.Sprintf("%s/%p", s.Connection(), s.pn) +} + +func (l Link) Connection() Connection { return l.Session().Connection() } + +// Human-readable link description including name, source, target and direction. +func (l Link) String() string { + if l.IsSender() { + return fmt.Sprintf("%s(%s->%s)", l.Name(), l.Source().Address(), l.Target().Address()) + } else { + return fmt.Sprintf("%s(%s<-%s)", l.Name(), l.Target().Address(), l.Source().Address()) + } +} + +// Error returns an instance of amqp.Error or nil. +func (c Condition) Error() error { + if c.IsNil() || !c.IsSet() { + return nil + } + return amqp.Error{c.Name(), c.Description()} +} + +// Set a Go error into a condition. +// If it is not an amqp.Condition use the error type as name, error string as description. +func (c Condition) SetError(err error) { + if err != nil { + if cond, ok := err.(amqp.Error); ok { + c.SetName(cond.Name) + c.SetDescription(cond.Description) + } else { + c.SetName(reflect.TypeOf(err).Name()) + c.SetDescription(err.Error()) + } + } +} + +func (c Connection) Session() (Session, error) { + s := Session{C.pn_session(c.pn)} + if s.IsNil() { + return s, Connection(c).Error() + } + return s, nil +} + +// pnTime converts Go time.Time to Proton millisecond Unix time. +func pnTime(t time.Time) C.pn_timestamp_t { + secs := t.Unix() + // Note: sub-second accuracy is not guaraunteed if the Unix time in + // nanoseconds cannot be represented by an int64 (sometime around year 2260) + msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) + return C.pn_timestamp_t(secs*1000 + msecs) +} + +// goTime converts a pn_timestamp_t to a Go time.Time. +func goTime(t C.pn_timestamp_t) time.Time { + secs := int64(t) / 1000 + nsecs := (int64(t) % 1000) * int64(time.Millisecond) + return time.Unix(secs, nsecs) +} + +// Special treatment for Transport.Head, return value is unsafe.Pointer not string +func (t Transport) Head() unsafe.Pointer { + return unsafe.Pointer(C.pn_transport_head(t.pn)) +} + +// Special treatment for Transport.Push, takes []byte instead of char*, size +func (t Transport) Push(bytes []byte) int { + return int(C.pn_transport_push(t.pn, (*C.char)(unsafe.Pointer(&bytes[0])), C.size_t(len(bytes)))) +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
