Author: rhs
Date: Wed Apr 11 22:39:33 2012
New Revision: 1325050
URL: http://svn.apache.org/viewvc?rev=1325050&view=rev
Log:
added tests and fixed various bugs found along the way
Added:
qpid/proton/trunk/proton-c/tests/
qpid/proton/trunk/proton-c/tests/__init__.py
qpid/proton/trunk/proton-c/tests/engine.py
Modified:
qpid/proton/trunk/proton-c/cproton.i
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
Modified: qpid/proton/trunk/proton-c/cproton.i
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/cproton.i?rev=1325050&r1=1325049&r2=1325050&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/cproton.i (original)
+++ qpid/proton/trunk/proton-c/cproton.i Wed Apr 11 22:39:33 2012
@@ -25,11 +25,10 @@ ssize_t pn_send(pn_link_t *transport, ch
ssize_t sz = pn_recv(link, OUTPUT, *OUTPUT_SIZE);
if (sz >= 0) {
*OUTPUT_SIZE = sz;
- return 0;
} else {
*OUTPUT_SIZE = 0;
- return sz;
}
+ return sz;
}
%}
%ignore pn_recv;
@@ -40,11 +39,10 @@ ssize_t pn_send(pn_link_t *transport, ch
ssize_t sz = pn_output(transport, OUTPUT, *OUTPUT_SIZE);
if (sz >= 0) {
*OUTPUT_SIZE = sz;
- return 0;
} else {
*OUTPUT_SIZE = 0;
- return sz;
}
+ return sz;
}
%}
%ignore pn_output;
@@ -74,11 +72,10 @@ ssize_t pn_send(pn_link_t *transport, ch
ssize_t sz = pn_message_data(OUTPUT, *OUTPUT_SIZE, STRING, LENGTH);
if (sz >= 0) {
*OUTPUT_SIZE = sz;
- return 0;
} else {
*OUTPUT_SIZE = 0;
- return sz;
}
+ return sz;
}
%}
%ignore pn_message_data;
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1325050&r1=1325049&r2=1325050&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Wed Apr 11 22:39:33
2012
@@ -169,6 +169,7 @@ struct pn_delivery_t {
char *bytes;
size_t size;
size_t capacity;
+ bool done;
void *context;
};
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1325050&r1=1325049&r2=1325050&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Wed Apr 11 22:39:33 2012
@@ -848,6 +848,7 @@ pn_delivery_t *pn_delivery(pn_link_t *li
delivery->bytes = NULL;
delivery->size = 0;
delivery->capacity = 0;
+ delivery->done = false;
delivery->context = NULL;
if (!link->current)
@@ -888,7 +889,11 @@ void pn_delivery_dump(pn_delivery_t *d)
pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
{
- return pn_dtag(pn_binary_bytes(delivery->tag),
pn_binary_size(delivery->tag));
+ if (delivery) {
+ return pn_dtag(pn_binary_bytes(delivery->tag),
pn_binary_size(delivery->tag));
+ } else {
+ return (pn_delivery_tag_t) {0};
+ }
}
pn_delivery_t *pn_current(pn_link_t *link)
@@ -915,6 +920,7 @@ bool pn_advance(pn_link_t *link)
{
if (link && link->current) {
pn_delivery_t *prev = link->current;
+ prev->done = true;
if (link->endpoint.type == SENDER) {
pn_advance_sender(link);
} else {
@@ -992,12 +998,15 @@ void pn_do_begin(pn_dispatcher_t *disp)
PN_SET_REMOTE(state->session->endpoint.state, PN_REMOTE_ACTIVE);
}
-pn_link_state_t *pn_find_link(pn_session_state_t *ssn_state, pn_string_t *name)
+pn_link_state_t *pn_find_link(pn_session_state_t *ssn_state, pn_string_t
*name, bool is_sender)
{
+ pn_endpoint_type_t type = is_sender ? SENDER : RECEIVER;
+
for (int i = 0; i < ssn_state->session->link_count; i++)
{
pn_link_t *link = ssn_state->session->links[i];
- if (!wcsncmp(pn_string_wcs(name), link->name, pn_string_size(name)))
+ if (link->endpoint.type == type &&
+ !wcsncmp(pn_string_wcs(name), link->name, pn_string_size(name)))
{
return pn_link_get_state(ssn_state, link);
}
@@ -1013,7 +1022,7 @@ void pn_do_attach(pn_dispatcher_t *disp)
bool is_sender = pn_to_bool(pn_list_get(args, ATTACH_ROLE));
pn_string_t *name = pn_to_string(pn_list_get(args, ATTACH_NAME));
pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
- pn_link_state_t *link_state = pn_find_link(ssn_state, name);
+ pn_link_state_t *link_state = pn_find_link(ssn_state, name, is_sender);
if (!link_state) {
pn_link_t *link;
if (is_sender) {
@@ -1053,19 +1062,25 @@ void pn_do_transfer(pn_dispatcher_t *dis
uint32_t handle = pn_to_uint32(pn_list_get(args, TRANSFER_HANDLE));
pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
pn_link_t *link = link_state->link;
- pn_binary_t *tag = pn_to_binary(pn_list_get(args, TRANSFER_DELIVERY_TAG));
- pn_delivery_t *delivery = pn_delivery(link, pn_dtag(pn_binary_bytes(tag),
pn_binary_size(tag)));
- pn_delivery_state_t *state = pn_delivery_buffer_push(&ssn_state->incoming,
delivery);
- delivery->context = state;
- // XXX: need to check that state is not null (i.e. we haven't hit the limit)
- pn_sequence_t id = pn_to_int32(pn_list_get(args, TRANSFER_DELIVERY_ID));
- if (id != state->id) {
- // XXX: signal error somehow
+ pn_delivery_t *delivery;
+ if (link->tail && !link->tail->done) {
+ delivery = link->tail;
+ } else {
+ pn_binary_t *tag = pn_to_binary(pn_list_get(args, TRANSFER_DELIVERY_TAG));
+ delivery = pn_delivery(link, pn_dtag(pn_binary_bytes(tag),
pn_binary_size(tag)));
+ pn_delivery_state_t *state = pn_delivery_buffer_push(&ssn_state->incoming,
delivery);
+ delivery->context = state;
+ // XXX: need to check that state is not null (i.e. we haven't hit the
limit)
+ pn_sequence_t id = pn_to_int32(pn_list_get(args, TRANSFER_DELIVERY_ID));
+ if (id != state->id) {
+ // XXX: signal error somehow
+ }
}
- PN_ENSURE(delivery->bytes, delivery->capacity, disp->size);
- memmove(delivery->bytes, disp->payload, disp->size);
- delivery->size = disp->size;
+ PN_ENSURE(delivery->bytes, delivery->capacity, delivery->size + disp->size);
+ memmove(delivery->bytes + delivery->size, disp->payload, disp->size);
+ delivery->size += disp->size;
+ delivery->done = !pn_to_bool(pn_list_get(args, TRANSFER_MORE));
}
void pn_do_flow(pn_dispatcher_t *disp)
@@ -1223,7 +1238,7 @@ void pn_process_conn_setup(pn_transport_
void pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
- if (endpoint->type == SESSION)
+ if (endpoint->type == SESSION && transport->open_sent)
{
pn_session_t *ssn = (pn_session_t *) endpoint;
pn_session_state_t *state = pn_session_get_state(transport, ssn);
@@ -1246,12 +1261,14 @@ void pn_process_ssn_setup(pn_transport_t
void pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
- if (endpoint->type == SENDER || endpoint->type == RECEIVER)
+ if (transport->open_sent && (endpoint->type == SENDER ||
+ endpoint->type == RECEIVER))
{
pn_link_t *link = (pn_link_t *) endpoint;
pn_session_state_t *ssn_state = pn_session_get_state(transport,
link->session);
pn_link_state_t *state = pn_link_get_state(ssn_state, link);
- if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle ==
(uint32_t) -1)
+ if (((int16_t) ssn_state->local_channel >= 0) &&
+ !(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle ==
(uint32_t) -1)
{
pn_init_frame(transport->disp);
pn_field(transport->disp, ATTACH_ROLE, pn_boolean(endpoint->type ==
RECEIVER));
@@ -1308,6 +1325,7 @@ void pn_post_disp(pn_transport_t *transp
pn_field(transport->disp, DISPOSITION_LAST, pn_uint(state->id));
// XXX
pn_field(transport->disp, DISPOSITION_SETTLED,
pn_boolean(delivery->local_settled));
+ //pn_field(transport->disp, DISPOSITION_BATCHABLE, pn_boolean(batchable));
uint64_t code;
switch(delivery->local_state) {
case PN_ACCEPTED:
@@ -1320,10 +1338,10 @@ void pn_post_disp(pn_transport_t *transp
default:
code = 0;
}
- if (code)
+ if (code) {
pn_field(transport->disp, DISPOSITION_STATE, pn_value("L([])", code));
- //pn_field(transport->disp, DISPOSITION_BATCHABLE, pn_boolean(batchable));
- pn_post_frame(transport->disp, ssn_state->local_channel, DISPOSITION);
+ pn_post_frame(transport->disp, ssn_state->local_channel, DISPOSITION);
+ }
}
void pn_process_disp_receiver(pn_transport_t *transport, pn_endpoint_t
*endpoint)
@@ -1368,18 +1386,22 @@ void pn_process_msg_data(pn_transport_t
state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
delivery->context = state;
}
- if (!state->sent && (int16_t) ssn_state->local_channel >= 0 &&
(int32_t) link_state->local_handle >= 0) {
+ if (!state->sent &&
+ (delivery->done || delivery->size > 0) &&
+ (int16_t) ssn_state->local_channel >= 0 && (int32_t)
link_state->local_handle >= 0) {
pn_init_frame(transport->disp);
pn_field(transport->disp, TRANSFER_HANDLE, pn_value("I",
link_state->local_handle));
pn_field(transport->disp, TRANSFER_DELIVERY_ID, pn_value("I",
state->id));
pn_field(transport->disp, TRANSFER_DELIVERY_TAG,
pn_from_binary(pn_binary_dup(delivery->tag)));
pn_field(transport->disp, TRANSFER_MESSAGE_FORMAT, pn_value("I", 0));
+ pn_field(transport->disp, TRANSFER_MORE,
pn_boolean(!delivery->done));
if (delivery->bytes) {
pn_append_payload(transport->disp, delivery->bytes,
delivery->size);
delivery->size = 0;
}
pn_post_frame(transport->disp, ssn_state->local_channel, TRANSFER);
- state->sent = true;
+ if (delivery->done)
+ state->sent = true;
}
}
delivery = delivery->tpwork_next;
@@ -1399,9 +1421,9 @@ void pn_process_disp_sender(pn_transport
if (link->endpoint.type == SENDER) {
// XXX: need to prevent duplicate disposition sending
pn_session_state_t *ssn_state = pn_session_get_state(transport,
link->session);
- /*if ((int16_t) ssn_state->local_channel >= 0) {
+ if ((int16_t) ssn_state->local_channel >= 0) {
pn_post_disp(transport, delivery);
- }*/
+ }
if (delivery->local_settled) {
pn_full_settle(&ssn_state->outgoing, delivery);
@@ -1533,17 +1555,18 @@ void pn_trace(pn_transport_t *transport,
ssize_t pn_send(pn_link_t *sender, const char *bytes, size_t n)
{
pn_delivery_t *current = pn_current(sender);
- if (!current) return -1;
- if (current->bytes) return 0;
+ if (!current) return PN_EOS;
PN_ENSURE(current->bytes, current->capacity, current->size + n);
memmove(current->bytes + current->size, bytes, n);
- current->size = +n;
+ current->size += n;
pn_add_tpwork(current);
return n;
}
ssize_t pn_recv(pn_link_t *receiver, char *bytes, size_t n)
{
+ if (!receiver) return PN_ARG_ERR;
+
pn_delivery_t *delivery = receiver->current;
if (delivery) {
if (delivery->size) {
@@ -1553,11 +1576,10 @@ ssize_t pn_recv(pn_link_t *receiver, cha
delivery->size -= size;
return size;
} else {
- return PN_EOS;
+ return delivery->done ? PN_EOS : 0;
}
} else {
- // XXX: ?
- return PN_EOS;
+ return PN_STATE_ERR;
}
}
@@ -1615,8 +1637,12 @@ bool pn_writable(pn_delivery_t *delivery
bool pn_readable(pn_delivery_t *delivery)
{
- pn_link_t *link = delivery->link;
- return pn_is_receiver(link) && pn_is_current(delivery);
+ if (delivery) {
+ pn_link_t *link = delivery->link;
+ return pn_is_receiver(link) && pn_is_current(delivery);
+ } else {
+ return false;
+ }
}
size_t pn_pending(pn_delivery_t *delivery)
Added: qpid/proton/trunk/proton-c/tests/__init__.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/tests/__init__.py?rev=1325050&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/tests/__init__.py (added)
+++ qpid/proton/trunk/proton-c/tests/__init__.py Wed Apr 11 22:39:33 2012
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import tests.engine
Added: qpid/proton/trunk/proton-c/tests/engine.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/tests/engine.py?rev=1325050&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/tests/engine.py (added)
+++ qpid/proton/trunk/proton-c/tests/engine.py Wed Apr 11 22:39:33 2012
@@ -0,0 +1,384 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from cproton import *
+
+def pump(t1, t2):
+ while True:
+ cd, out1 = pn_output(t1, 1024)
+ assert cd >= 0
+ cd, out2 = pn_output(t2, 1024)
+ assert cd >= 0
+
+ if out1 or out2:
+ if out1:
+ cd = pn_input(t2, out1, len(out1))
+ assert cd == len(out1)
+ if out2:
+ cd = pn_input(t1, out2, len(out2))
+ assert cd == len(out2)
+ else:
+ return
+
+class Test:
+
+ def __init__(self, name):
+ self.name = name
+
+ def setup(self):
+ self.c1 = pn_connection()
+ self.c2 = pn_connection()
+ self.t1 = pn_transport(self.c1)
+ self.t2 = pn_transport(self.c2)
+ pn_transport_open(self.t1)
+ pn_transport_open(self.t2)
+ trc = os.environ["PN_TRACE_FRM"]
+ if trc and trc.lower() in ("1", "2", "yes", "true"):
+ pn_trace(self.t1, PN_TRACE_FRM)
+ if trc == "2":
+ pn_trace(self.t2, PN_TRACE_FRM)
+
+ def teardown(self):
+ pn_connection_destroy(self.c1)
+ pn_connection_destroy(self.c2)
+
+ def pump(self):
+ pump(self.t1, self.t2)
+
+class ConnectionTest(Test):
+
+ def test_open_close(self):
+ assert pn_connection_state(self.c1) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+ assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+ pn_connection_open(self.c1)
+ self.pump()
+
+ assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+ assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
+
+ pn_connection_open(self.c2)
+ self.pump()
+
+ assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+ pn_connection_close(self.c1)
+ self.pump()
+
+ assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+ assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
+
+ pn_connection_close(self.c2)
+ self.pump()
+
+ assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert pn_connection_state(self.c2) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+ def test_simultaneous_open_close(self):
+ assert pn_connection_state(self.c1) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+ assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+ pn_connection_open(self.c1)
+ pn_connection_open(self.c2)
+
+ self.pump()
+
+ assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+ pn_connection_close(self.c1)
+ pn_connection_close(self.c2)
+
+ self.pump()
+
+ assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert pn_connection_state(self.c2) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+class SessionTest(Test):
+
+ def setup(self):
+ Test.setup(self)
+ self.ssn = pn_session(self.c1)
+ pn_connection_open(self.c1)
+ pn_connection_open(self.c2)
+
+ def test_open_close(self):
+ assert pn_session_state(self.ssn) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+ pn_session_open(self.ssn)
+
+ assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+
+ self.pump()
+
+ assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+
+ ssn = pn_session_head(self.c2, PN_REMOTE_ACTIVE | PN_LOCAL_UNINIT)
+
+ assert ssn != None
+ assert pn_session_state(ssn) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
+ assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+
+ pn_session_open(ssn)
+
+ assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+
+ self.pump()
+
+ assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+ pn_session_close(ssn)
+
+ assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+ assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+ self.pump()
+
+ assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+ assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
+
+ pn_session_close(self.ssn)
+
+ assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+ assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+ self.pump()
+
+ assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+ def test_simultaneous_close(self):
+ pn_session_open(self.ssn)
+ self.pump()
+ ssn = pn_session_head(self.c2, PN_REMOTE_ACTIVE | PN_LOCAL_UNINIT)
+ assert ssn != None
+ pn_session_open(ssn)
+ self.pump()
+
+ assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+ pn_session_close(self.ssn)
+ pn_session_close(ssn)
+
+ assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+ assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+
+ self.pump()
+
+ assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+class LinkTest(Test):
+
+ def setup(self):
+ Test.setup(self)
+ pn_connection_open(self.c1)
+ pn_connection_open(self.c2)
+ self.ssn1 = pn_session(self.c1)
+ pn_session_open(self.ssn1)
+ self.pump()
+ self.ssn2 = pn_session_head(self.c2, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
+ pn_session_open(self.ssn2)
+ self.pump()
+ self.snd = pn_sender(self.ssn1, "test-link")
+ self.rcv = pn_receiver(self.ssn2, "test-link")
+
+ def test_open_close(self):
+ assert pn_link_state(self.snd) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+ assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+ pn_link_open(self.snd)
+
+ assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+ assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+ self.pump()
+
+ assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+ assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
+
+ pn_link_open(self.rcv)
+
+ assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+ assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+ self.pump()
+
+ assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+ pn_link_close(self.snd)
+
+ assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+ assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+ self.pump()
+
+ assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+ assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
+
+ pn_link_close(self.rcv)
+
+ assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+ assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+ self.pump()
+
+ assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+ def test_simultaneous_open_close(self):
+ assert pn_link_state(self.snd) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+ assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+ pn_link_open(self.snd)
+ pn_link_open(self.rcv)
+
+ assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+ assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+
+ self.pump()
+
+ assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+ assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+ pn_link_close(self.snd)
+ pn_link_close(self.rcv)
+
+ assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+ assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+
+ self.pump()
+
+ assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+ assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+class TransferTest(Test):
+
+ def setup(self):
+ Test.setup(self)
+ self.ssn1 = pn_session(self.c1)
+ pn_connection_open(self.c1)
+ pn_connection_open(self.c2)
+ pn_session_open(self.ssn1)
+ self.pump()
+ self.ssn2 = pn_session_head(self.c2, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
+ assert self.ssn2 != None
+ pn_session_open(self.ssn2)
+
+ self.snd = pn_sender(self.ssn1, "test-link")
+ self.rcv = pn_receiver(self.ssn2, "test-link")
+ pn_link_open(self.snd)
+ pn_link_open(self.rcv)
+ self.pump()
+
+ def test_work_queue(self):
+ assert pn_work_head(self.c1) is None
+ pn_delivery(self.snd, "tag")
+ assert pn_work_head(self.c1) is None
+ pn_flow(self.rcv, 1)
+ self.pump()
+ d = pn_work_head(self.c1)
+ assert d is not None
+ assert pn_delivery_tag(d) == "tag"
+ assert pn_writable(d)
+
+ n = pn_send(self.snd, "this is a test")
+ assert pn_advance(self.snd)
+ assert pn_work_head(self.c1) is None
+
+ self.pump()
+
+ d = pn_work_head(self.c2)
+ assert pn_delivery_tag(d) == "tag"
+ assert pn_readable(d)
+
+ def test_multiframe(self):
+ pn_flow(self.rcv, 1)
+ pn_delivery(self.snd, "tag")
+ msg = "this is a test"
+ n = pn_send(self.snd, msg)
+ assert n == len(msg)
+
+ self.pump()
+
+ d = pn_current(self.rcv)
+ assert pn_delivery_tag(d) == "tag"
+ assert pn_readable(d)
+
+ cd, bytes = pn_recv(self.rcv, 1024)
+ assert bytes == msg
+ assert cd == len(bytes)
+
+ cd, bytes = pn_recv(self.rcv, 1024)
+ assert cd == 0
+ assert bytes == ""
+
+ msg = "this is more"
+ n = pn_send(self.snd, msg)
+ assert n == len(msg)
+ assert pn_advance(self.snd)
+
+ self.pump()
+
+ cd, bytes = pn_recv(self.rcv, 1024)
+ assert cd == len(bytes)
+ assert bytes == msg
+
+ cd, bytes = pn_recv(self.rcv, 1024)
+ assert cd == PN_EOS
+ assert bytes == ""
+
+ def test_disposition(self):
+ pn_flow(self.rcv, 1)
+
+ self.pump()
+
+ sd = pn_delivery(self.snd, "tag")
+ msg = "this is a test"
+ n = pn_send(self.snd, msg)
+ assert n == len(msg)
+ assert pn_advance(self.snd)
+
+ self.pump()
+
+ rd = pn_current(self.rcv)
+ assert rd is not None
+ assert pn_delivery_tag(rd) == pn_delivery_tag(sd)
+ cd, rmsg = pn_recv(self.rcv, 1024)
+ assert cd == len(rmsg)
+ assert rmsg == msg
+ pn_disposition(rd, PN_ACCEPTED)
+
+ self.pump()
+
+ assert pn_remote_disp(sd) == pn_local_disp(rd) == PN_ACCEPTED
+ assert pn_dirty(sd)
+
+ pn_disposition(sd, PN_ACCEPTED)
+ pn_settle(sd)
+
+ self.pump()
+
+ assert pn_local_disp(sd) == pn_remote_disp(rd) == PN_ACCEPTED
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]