Author: rhs
Date: Tue Jun 12 19:29:58 2012
New Revision: 1349489
URL: http://svn.apache.org/viewvc?rev=1349489&view=rev
Log:
fixed numerous engine bugs; added tests
Modified:
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/tests/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1349489&r1=1349488&r2=1349489&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue Jun 12 19:29:58 2012
@@ -97,13 +97,15 @@ pn_state_t pn_connection_state(pn_connec
/** @todo: needs documentation */
pn_error_t *pn_connection_error(pn_connection_t *connection);
/** @todo: needs documentation */
-char *pn_connection_container(pn_connection_t *connection);
+const char *pn_connection_container(pn_connection_t *connection);
/** @todo: needs documentation */
void pn_connection_set_container(pn_connection_t *connection, const char
*container);
/** @todo: needs documentation */
-char *pn_connection_hostname(pn_connection_t *connection);
+const char *pn_connection_hostname(pn_connection_t *connection);
/** @todo: needs documentation */
void pn_connection_set_hostname(pn_connection_t *connection, const char
*hostname);
+const char *pn_connection_remote_container(pn_connection_t *connection);
+const char *pn_connection_remote_hostname(pn_connection_t *connection);
/** Extracts the first delivery on the connection that has pending
* operations.
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1349489&r1=1349488&r2=1349489&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Jun 12 19:29:58
2012
@@ -124,6 +124,8 @@ struct pn_connection_t {
pn_delivery_t *tpwork_tail;
char *container;
char *hostname;
+ char *remote_container;
+ char *remote_hostname;
};
struct pn_session_t {
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1349489&r1=1349488&r2=1349489&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Jun 12 19:29:58 2012
@@ -197,6 +197,8 @@ void pn_connection_destroy(pn_connection
free(connection->sessions);
free(connection->container);
free(connection->hostname);
+ free(connection->remote_container);
+ free(connection->remote_hostname);
free(connection);
}
@@ -407,6 +409,8 @@ pn_connection_t *pn_connection()
conn->tpwork_tail = NULL;
conn->container = NULL;
conn->hostname = NULL;
+ conn->remote_container = NULL;
+ conn->remote_hostname = NULL;
return conn;
}
@@ -421,7 +425,7 @@ pn_error_t *pn_connection_error(pn_conne
return connection ? &connection->endpoint.error : NULL;
}
-char *pn_connection_container(pn_connection_t *connection)
+const char *pn_connection_container(pn_connection_t *connection)
{
return connection ? connection->container : NULL;
}
@@ -433,7 +437,7 @@ void pn_connection_set_container(pn_conn
connection->container = pn_strdup(container);
}
-char *pn_connection_hostname(pn_connection_t *connection)
+const char *pn_connection_hostname(pn_connection_t *connection)
{
return connection ? connection->hostname : NULL;
}
@@ -445,6 +449,16 @@ void pn_connection_set_hostname(pn_conne
connection->hostname = pn_strdup(hostname);
}
+const char *pn_connection_remote_container(pn_connection_t *connection)
+{
+ return connection ? connection->remote_container : NULL;
+}
+
+const char *pn_connection_remote_hostname(pn_connection_t *connection)
+{
+ return connection ? connection->remote_hostname : NULL;
+}
+
pn_delivery_t *pn_work_head(pn_connection_t *connection)
{
if (!connection) return NULL;
@@ -556,6 +570,8 @@ bool pn_matches(pn_endpoint_t *endpoint,
{
if (endpoint->type != type) return false;
+ if (!state) return true;
+
int st = endpoint->state;
if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0)
return st & state;
@@ -1055,11 +1071,32 @@ int pn_do_error(pn_transport_t *transpor
return PN_ERR;
}
+char *pn_bytes_strdup(pn_bytes_t str)
+{
+ return pn_strndup(str.start, str.size);
+}
+
int pn_do_open(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
pn_connection_t *conn = transport->connection;
+ bool container_q, hostname_q;
+ pn_bytes_t remote_container, remote_hostname;
+ int err = pn_scan_args(disp, "D.[?S?S]", &container_q, &remote_container,
+ &hostname_q, &remote_hostname);
+ if (err) return err;
+ if (container_q) {
+ conn->remote_container = pn_bytes_strdup(remote_container);
+ } else {
+ conn->remote_container = NULL;
+ }
+ if (hostname_q) {
+ conn->remote_hostname = pn_bytes_strdup(remote_hostname);
+ } else {
+ conn->remote_hostname = NULL;
+ }
PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
+ transport->open_rcvd = true;
return 0;
}
@@ -1103,11 +1140,6 @@ pn_link_state_t *pn_find_link(pn_session
return NULL;
}
-char *pn_bytes_strdup(pn_bytes_t str)
-{
- return pn_strndup(str.start, str.size);
-}
-
int pn_do_attach(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
@@ -1329,7 +1361,7 @@ int pn_do_detach(pn_dispatcher_t *disp)
pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
pn_link_t *link = link_state->link;
- link_state->remote_handle = -1;
+ link_state->remote_handle = -2;
if (closed)
{
@@ -1347,7 +1379,7 @@ int pn_do_end(pn_dispatcher_t *disp)
pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
pn_session_t *session = ssn_state->session;
- ssn_state->remote_channel = -1;
+ ssn_state->remote_channel = -2;
PN_SET_REMOTE(session->endpoint.state, PN_REMOTE_CLOSED);
return 0;
}
@@ -1362,6 +1394,8 @@ int pn_do_close(pn_dispatcher_t *disp)
ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available)
{
+ if (!transport) return PN_ARG_ERR;
+
if (!available) {
pn_do_error(transport, "amqp:connection:framing-error", "connection
aborted");
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
@@ -1454,13 +1488,15 @@ int pn_process_link_setup(pn_transport_t
{
// XXX
state->local_handle = link->id;
- pn_post_frame(transport->disp, ssn_state->local_channel,
"DL[SIonn?DL[S]?DL[S]nnI]", ATTACH,
- link->name,
- state->local_handle,
- endpoint->type == RECEIVER,
- (bool) link->local_source, SOURCE, link->local_source,
- (bool) link->local_target, TARGET, link->local_target,
- 0);
+ int err = pn_post_frame(transport->disp, ssn_state->local_channel,
+ "DL[SIonn?DL[S]?DL[S]nnI]", ATTACH,
+ link->name,
+ state->local_handle,
+ endpoint->type == RECEIVER,
+ (bool) link->local_source, SOURCE,
link->local_source,
+ (bool) link->local_target, TARGET,
link->local_target,
+ 0);
+ if (err) return err;
}
}
@@ -1581,7 +1617,7 @@ int pn_process_tpwork_receiver(pn_transp
pn_link_t *link = delivery->link;
// XXX: need to prevent duplicate disposition sending
pn_session_state_t *ssn_state = pn_session_get_state(transport,
link->session);
- if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled) {
+ if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled &&
delivery->context) {
int err = pn_post_disp(transport, delivery);
if (err) return err;
}
@@ -1659,8 +1695,12 @@ int pn_process_link_teardown(pn_transpor
pn_session_t *session = link->session;
pn_session_state_t *ssn_state = pn_session_get_state(transport, session);
pn_link_state_t *state = pn_link_get_state(ssn_state, link);
- if (endpoint->state & PN_LOCAL_CLOSED && (int32_t) state->local_handle >=
0) {
- if (pn_is_sender(link) && pn_queued(link)) return 0;
+ if (endpoint->state & PN_LOCAL_CLOSED && (int32_t) state->local_handle >=
0 &&
+ (int16_t) ssn_state->local_channel >= 0 && !transport->close_sent) {
+ if (pn_is_sender(link) && pn_queued(link) &&
+ (int32_t) state->remote_handle != -2 &&
+ (int16_t) ssn_state->remote_channel != -2 &&
+ !transport->close_rcvd) return 0;
int err = pn_post_frame(transport->disp, ssn_state->local_channel,
"DL[Io]", DETACH,
state->local_handle, true);
if (err) return err;
@@ -1673,15 +1713,42 @@ int pn_process_link_teardown(pn_transpor
return 0;
}
+bool pn_pointful_buffering(pn_transport_t *transport, pn_session_t *session)
+{
+ if (!transport->open_rcvd) return true;
+ if (transport->close_rcvd) return false;
+
+ pn_connection_t *conn = transport->connection;
+ pn_link_t *link = pn_link_head(conn, 0);
+ while (link) {
+ if (pn_is_sender(link) && pn_queued(link) > 0) {
+ pn_session_t *ssn = link->session;
+ if (session && session == ssn) {
+ pn_session_state_t *ssn_state = pn_session_get_state(transport,
session);
+ pn_link_state_t *state = pn_link_get_state(ssn_state, link);
+
+ if ((int32_t) state->remote_handle != -2 &&
+ (int16_t) ssn_state->remote_channel != -2) {
+ return true;
+ }
+ }
+ }
+ link = pn_link_next(link, 0);
+ }
+
+ return false;
+}
+
int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SESSION)
{
pn_session_t *session = (pn_session_t *) endpoint;
pn_session_state_t *state = pn_session_get_state(transport, session);
- if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >=
0)
+ if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0
+ && !transport->close_sent)
{
- if (transport->connection->tpwork_head) return 0;
+ if (pn_pointful_buffering(transport, session)) return 0;
int err = pn_post_frame(transport->disp, state->local_channel, "DL[]",
END);
if (err) return err;
state->local_channel = -2;
@@ -1697,7 +1764,7 @@ int pn_process_conn_teardown(pn_transpor
if (endpoint->type == CONNECTION)
{
if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) {
- if (transport->connection->tpwork_head) return 0;
+ if (pn_pointful_buffering(transport, NULL)) return 0;
int err = pn_post_close(transport);
if (err) return err;
transport->close_sent = true;
@@ -1714,9 +1781,10 @@ int pn_phase(pn_transport_t *transport,
pn_endpoint_t *endpoint = conn->transport_head;
while (endpoint)
{
+ pn_endpoint_t *next = endpoint->transport_next;
int err = phase(transport, endpoint);
if (err) return err;
- endpoint = endpoint->transport_next;
+ endpoint = next;
}
return 0;
}
@@ -1749,6 +1817,8 @@ int pn_process(pn_transport_t *transport
ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
{
+ if (!transport) return PN_ARG_ERR;
+
if (!transport->error)
transport->error = pn_process(transport);
Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1349489&r1=1349488&r2=1349489&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Tue Jun 12 19:29:58 2012
@@ -17,13 +17,15 @@
# under the License.
#
-import os, common
+import os, common, xproton
from xproton import *
# future test areas
# + different permutations of setup
# - creating deliveries and calling input/output before opening the
session/link
# + shrinking output_size down to something small? should the enginge buffer?
+# + resuming
+# - locally and remotely created deliveries with the same tag
OUTPUT_SIZE = 10*1024
@@ -222,6 +224,15 @@ class SessionTest(Test):
assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ def test_closing_connection(self):
+ pn_session_open(self.ssn)
+ self.pump()
+ pn_connection_close(self.c1)
+ self.pump()
+ pn_session_close(self.ssn)
+ self.pump()
+
+
class LinkTest(Test):
def setup(self):
@@ -300,6 +311,49 @@ class LinkTest(Test):
assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ def test_multiple(self):
+ rcv = pn_receiver(pn_get_session(self.snd), "second-rcv")
+ pn_link_open(self.snd)
+ pn_link_open(rcv)
+ self.pump()
+ c2 = pn_get_connection(pn_get_session(self.rcv))
+ l = pn_link_head(c2, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
+ while l:
+ pn_link_open(l)
+ l = pn_link_next(l, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
+ self.pump()
+
+ assert self.snd
+ assert rcv
+ pn_link_close(self.snd)
+ pn_link_close(rcv)
+ ssn = pn_get_session(rcv)
+ conn = pn_get_connection(ssn)
+ pn_session_close(ssn)
+ pn_connection_close(conn)
+ self.pump()
+
+ def test_closing_session(self):
+ pn_link_open(self.snd)
+ pn_link_open(self.rcv)
+ ssn1 = pn_get_session(self.snd)
+ self.pump()
+ pn_session_close(ssn1)
+ self.pump()
+ pn_link_close(self.snd)
+ self.pump()
+
+ def test_closing_connection(self):
+ pn_link_open(self.snd)
+ pn_link_open(self.rcv)
+ ssn1 = pn_get_session(self.snd)
+ c1 = pn_get_connection(ssn1)
+ self.pump()
+ pn_connection_close(c1)
+ self.pump()
+ pn_link_close(self.snd)
+ self.pump()
+
class TransferTest(Test):
def setup(self):
@@ -512,6 +566,61 @@ class CreditTest(Test):
assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
+ def _testBufferingOnClose(self, a, b):
+ for i in range(10):
+ d = pn_delivery(self.snd, "tag-%s" % i)
+ assert d
+ pn_settle(d)
+ self.pump()
+ assert pn_queued(self.snd) == 10
+
+ endpoints = {"connection": (self.c1, self.c2),
+ "session": (pn_get_session(self.snd),
pn_get_session(self.rcv)),
+ "link": (self.snd, self.rcv)}
+
+ local_a, remote_a = endpoints[a]
+ local_b, remote_b = endpoints[b]
+
+ a_close = getattr(xproton, "pn_%s_close" % a)
+ a_state = getattr(xproton, "pn_%s_state" % a)
+ b_close = getattr(xproton, "pn_%s_close" % b)
+ b_state = getattr(xproton, "pn_%s_state" % b)
+
+ b_close(remote_b)
+ self.pump()
+ assert b_state(local_b) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
+ a_close(local_a)
+ self.pump()
+ assert a_state(remote_a) & PN_REMOTE_CLOSED
+ assert pn_queued(self.snd) == 10
+
+ def testBufferingOnCloseLinkLink(self):
+ self._testBufferingOnClose("link", "link")
+
+ def testBufferingOnCloseLinkSession(self):
+ self._testBufferingOnClose("link", "session")
+
+ def testBufferingOnCloseLinkConnection(self):
+ self._testBufferingOnClose("link", "connection")
+
+ def testBufferingOnCloseSessionLink(self):
+ self._testBufferingOnClose("session", "link")
+
+ def testBufferingOnCloseSessionSession(self):
+ self._testBufferingOnClose("session", "session")
+
+ def testBufferingOnCloseSessionConnection(self):
+ self._testBufferingOnClose("session", "connection")
+
+ def testBufferingOnCloseConnectionLink(self):
+ self._testBufferingOnClose("connection", "link")
+
+ def testBufferingOnCloseConnectionSession(self):
+ self._testBufferingOnClose("connection", "session")
+
+ def testBufferingOnCloseConnectionConnection(self):
+ self._testBufferingOnClose("connection", "connection")
+
def testCreditWithBuffering(self):
pn_flow(self.rcv, PN_SESSION_WINDOW + 10)
self.pump()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]