Author: gsim Date: Fri Oct 10 12:42:08 2014 New Revision: 1630795 URL: http://svn.apache.org/r1630795 Log: PROTON-610: Messenger code doesn't send heartbeat frames
Modified: qpid/proton/branches/examples/proton-c/src/dispatcher/dispatcher.c qpid/proton/branches/examples/proton-c/src/messenger/messenger.c qpid/proton/branches/examples/tests/python/proton_tests/messenger.py Modified: qpid/proton/branches/examples/proton-c/src/dispatcher/dispatcher.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/dispatcher/dispatcher.c?rev=1630795&r1=1630794&r2=1630795&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/dispatcher/dispatcher.c (original) +++ qpid/proton/branches/examples/proton-c/src/dispatcher/dispatcher.c Fri Oct 10 12:42:08 2014 @@ -116,6 +116,10 @@ static void pn_do_trace(pn_dispatcher_t pn_string_format(disp->scratch, "%u %s ", ch, dir == OUT ? "->" : "<-"); pn_inspect(args, disp->scratch); + if (pn_data_size(args)==0) { + pn_string_addf(disp->scratch, "(EMPTY FRAME)"); + } + if (size) { char buf[1024]; int e = pn_quote_data(buf, 1024, payload, size); Modified: qpid/proton/branches/examples/proton-c/src/messenger/messenger.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/messenger/messenger.c?rev=1630795&r1=1630794&r2=1630795&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/messenger/messenger.c (original) +++ qpid/proton/branches/examples/proton-c/src/messenger/messenger.c Fri Oct 10 12:42:08 2014 @@ -31,6 +31,7 @@ #include <stdlib.h> #include <string.h> #include <stdio.h> + #include "../util.h" #include "../platform.h" #include "../platform_fmt.h" @@ -1275,8 +1276,37 @@ int pn_messenger_process_events(pn_messe return processed; } +/** + * Function to invoke AMQP related timer events, such as a heartbeat to prevent + * remote_idle timeout events + */ +static void pni_messenger_tick(pn_messenger_t *messenger) +{ + for (size_t i = 0; i < pn_list_size(messenger->connections); i++) { + pn_connection_t *connection = + (pn_connection_t *)pn_list_get(messenger->connections, i); + pn_transport_t *transport = pn_connection_transport(connection); + if (transport) { + pn_transport_tick(transport, pn_i_now()); + + // if there is pending data, such as an empty heartbeat frame, call + // process events. This should kick off the chain of selectables for + // reading/writing. + ssize_t pending = pn_transport_pending(transport); + if (pending > 0) { + pn_connection_ctx_t *cctx = + (pn_connection_ctx_t *)pn_connection_get_context(connection); + pn_messenger_process_events(messenger); + pn_messenger_flow(messenger); + pni_conn_modified(pni_context(cctx->selectable)); + } + } + } +} + int pn_messenger_process(pn_messenger_t *messenger) { + bool doMessengerTick = true; pn_selectable_t *sel; int events; while ((sel = pn_selector_next(messenger->selector, &events))) { @@ -1285,12 +1315,17 @@ int pn_messenger_process(pn_messenger_t } if (events & PN_WRITABLE) { pn_selectable_writable(sel); + doMessengerTick = false; } if (events & PN_EXPIRED) { pn_selectable_expired(sel); } } - + // ensure timer events are processed. Cannot call this inside the while loop + // as the timer events are not seen by the selector + if (doMessengerTick) { + pni_messenger_tick(messenger); + } if (messenger->interrupted) { messenger->interrupted = false; return PN_INTR; Modified: qpid/proton/branches/examples/tests/python/proton_tests/messenger.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tests/python/proton_tests/messenger.py?rev=1630795&r1=1630794&r2=1630795&view=diff ============================================================================== --- qpid/proton/branches/examples/tests/python/proton_tests/messenger.py (original) +++ qpid/proton/branches/examples/tests/python/proton_tests/messenger.py Fri Oct 10 12:42:08 2014 @@ -1027,3 +1027,59 @@ class SelectableMessengerTest(common.Tes def testSelectable4096(self): self.testSelectable(count=4096) + + +class IdleTimeoutTest(common.Test): + + def testIdleTimeout(self): + """ + Verify that a Messenger connection is kept alive using empty idle frames + when a idle_timeout is advertised by the remote peer. + """ + if "java" in sys.platform: + raise Skipped() + idle_timeout_secs = self.delay + + try: + idle_server = common.TestServerDrain(idle_timeout=idle_timeout_secs) + idle_server.timeout = self.timeout + idle_server.start() + + idle_client = Messenger("idle_client") + idle_client.timeout = self.timeout + idle_client.start() + + idle_client.subscribe("amqp://%s:%s/foo" % + (idle_server.host, idle_server.port)) + idle_client.work(idle_timeout_secs/10) + + # wait up to 3x the idle timeout and hence verify that everything stays + # connected during that time by virtue of no Exception being raised + duration = 3 * idle_timeout_secs + deadline = time() + duration + while time() <= deadline: + idle_client.work(idle_timeout_secs/10) + continue + + # confirm link is still active + cxtr = idle_server.driver.head_connector() + assert not cxtr.closed, "Connector has unexpectedly been closed" + conn = cxtr.connection + assert conn.state == (Endpoint.LOCAL_ACTIVE + | Endpoint.REMOTE_ACTIVE + ), "Connection has unexpectedly terminated" + link = conn.link_head(0) + while link: + assert link.state != (Endpoint.REMOTE_CLOSED + ), "Link unexpectedly closed" + link = link.next(0) + + finally: + try: + idle_client.stop() + except: + pass + try: + idle_server.stop() + except: + pass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org