This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new e7fe160 NO-JIRA: fix parallel priority test timeouts e7fe160 is described below commit e7fe1603e1667c7b05a76dd8ba86e912608522fd Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Thu Jun 17 12:37:50 2021 -0400 NO-JIRA: fix parallel priority test timeouts Update test clients to output debug logging. This closes #1265 --- tests/system_tests_edge_router.py | 6 +++-- tests/system_tests_link_routes.py | 8 ++++--- tests/system_tests_router_mesh.py | 16 +++++++------ tests/system_tests_two_routers.py | 6 +++-- tests/test-receiver.c | 25 +++++++++++++++++++- tests/test-sender.c | 49 ++++++++++++++++++++++----------------- 6 files changed, 74 insertions(+), 36 deletions(-) diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py index fcb41f5..99b358f 100644 --- a/tests/system_tests_edge_router.py +++ b/tests/system_tests_edge_router.py @@ -3110,7 +3110,8 @@ class StreamingMessageTest(TestCase): "-i", "TestReceiver-%d" % self._container_index, "-a", router.listener, "-c", str(count), - "-s", address] + "-s", address, + "-d"] self._container_index += 1 env = dict(os.environ, PN_TRACE_FRM="1") return self.popen(cmd, expect=expect, env=env) @@ -3125,7 +3126,8 @@ class StreamingMessageTest(TestCase): "-a", router.listener, "-c", str(count), "-t", address, - size] + size, + "-d"] self._container_index += 1 env = dict(os.environ, PN_TRACE_FRM="1") return self.popen(cmd, expect=expect, env=env) diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 88c607e..81246e8 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -2653,7 +2653,7 @@ class LinkRoute3Hop(TestCase): same session. """ send_clients = 10 - send_batch = 10 + send_batch = 5 total = send_clients * send_batch fake_service = FakeService(self.QDR_A.addresses[1], @@ -2665,7 +2665,8 @@ class LinkRoute3Hop(TestCase): rx = self.popen(["test-receiver", "-a", self.QDR_C.addresses[0], "-c", str(total), - "-s", "closest/test-client"], + "-s", "closest/test-client", + "-d"], env=env, expect=Process.EXIT_OK) @@ -2675,7 +2676,8 @@ class LinkRoute3Hop(TestCase): "-c", str(send_batch), "-i", "TestSender-%s" % x, "-sx", # huge message size to trigger Q2/Q3 - "-t", "closest/test-client"], + "-t", "closest/test-client", + "-d"], env=env, expect=Process.EXIT_OK) diff --git a/tests/system_tests_router_mesh.py b/tests/system_tests_router_mesh.py index 383b384..5b42b6a 100644 --- a/tests/system_tests_router_mesh.py +++ b/tests/system_tests_router_mesh.py @@ -205,8 +205,7 @@ class ThreeRouterTest(TestCase): def test_06_parallel_priority(self): """ - Create 10 senders each with a different priority. Send large messages - - large enough to trigger Qx flow control (sender argument "-sx"). + Create 10 senders each with a different priority. Ensure all messages arrive as expected. """ priorities = 10 @@ -214,23 +213,26 @@ class ThreeRouterTest(TestCase): total = priorities * send_batch rx = self.spawn_receiver(self.RouterC, - count=total, - address="closest/test_06_address") + total, + "closest/test_06_address", + "-d") self.RouterA.wait_address("closest/test_06_address") senders = [self.spawn_sender(self.RouterA, send_batch, "closest/test_06_address", - "-sx", "-p%s" % p) + "-sm", "-p%s" % p, "-d") for p in range(priorities)] - if rx.wait(timeout=TIMEOUT): - raise Exception("Receiver failed to consume all messages") + # wait for all senders to finish first, then check the receiver for tx in senders: out_text, out_err = tx.communicate(timeout=TIMEOUT) if tx.returncode: raise Exception("Sender failed: %s %s" % (out_text, out_err)) + if rx.wait(timeout=TIMEOUT): + raise Exception("Receiver failed to consume all messages") + if __name__ == '__main__': unittest.main(main_module()) diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py index 6e481a6..a287411 100644 --- a/tests/system_tests_two_routers.py +++ b/tests/system_tests_two_routers.py @@ -1998,7 +1998,8 @@ class StreamingLinkScrubberTest(TestCase): cmd = ["test-receiver", "-a", self.RouterB.listener, "-s", address, - "-c", str(sender_count)] + "-c", str(sender_count), + "-d"] rx = self.popen(cmd, env=env) self.RouterA.wait_address(address) @@ -2011,7 +2012,8 @@ class StreamingLinkScrubberTest(TestCase): "-a", self.RouterA.listener, "-t", address, "-c", "1", - "-sx" + "-sx", + "-d" ] senders = [self.popen(cmd, env=env) for x in range(sender_count)] diff --git a/tests/test-receiver.c b/tests/test-receiver.c index 38820db..14f8c9b 100644 --- a/tests/test-receiver.c +++ b/tests/test-receiver.c @@ -32,9 +32,13 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> +#include <time.h> #include <unistd.h> +#define BOOL2STR(b) ((b)?"true":"false") + bool stop = false; +bool verbose = false; int credit_window = 1000; char *source_address = "test-address"; // name of the source node to receive from @@ -155,6 +159,7 @@ static void usage(void) printf("-s \tSource address [%s]\n", source_address); printf("-w \tCredit window [%d]\n", credit_window); printf("-E \tExit without cleanly closing the connection [off]\n"); + printf("-d \tPrint periodic status updates [%s]\n", BOOL2STR(verbose)); exit(1); } @@ -169,7 +174,7 @@ int main(int argc, char** argv) /* command line options */ opterr = 0; int c; - while((c = getopt(argc, argv, "i:a:s:hw:c:E")) != -1) { + while((c = getopt(argc, argv, "i:a:s:hdw:c:E")) != -1) { switch(c) { case 'h': usage(); break; case 'a': host_address = optarg; break; @@ -184,6 +189,7 @@ int main(int argc, char** argv) usage(); break; case 'E': drop_connection = true; break; + case 'd': verbose = true; break; default: usage(); @@ -219,6 +225,7 @@ int main(int argc, char** argv) pn_reactor_start(reactor); + time_t last_log = time(NULL); while (pn_reactor_process(reactor)) { if (stop) { if (drop_connection) // hard exit @@ -228,6 +235,17 @@ int main(int argc, char** argv) if (pn_link) pn_link_close(pn_link); if (pn_ssn) pn_session_close(pn_ssn); if (pn_conn) pn_connection_close(pn_conn); + + } else if (verbose) { + + // periodically give status for test output logs + + time_t now = time(NULL); + if ((now - last_log) >= 10) { + fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit); + fflush(stdout); + last_log = now; + } } } @@ -237,5 +255,10 @@ int main(int argc, char** argv) pn_reactor_free(reactor); + if (verbose) { + fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit); + fflush(stdout); + } + return 0; } diff --git a/tests/test-sender.c b/tests/test-sender.c index 322de00..57867b4 100644 --- a/tests/test-sender.c +++ b/tests/test-sender.c @@ -55,6 +55,7 @@ // bool stop = false; +bool verbose = false; uint64_t limit = 1; // # messages to send uint64_t count = 0; // # sent @@ -357,6 +358,7 @@ static void usage(void) printf("-E \tExit without cleanly closing the connection [off]\n"); printf("-p \tMessage priority [%d]\n", priority); printf("-X \tMessage body data pattern [%c]\n", (char)body_data_pattern); + printf("-d \tPrint periodic status updates [%s]\n", BOOL2STR(verbose)); exit(1); } @@ -365,7 +367,7 @@ int main(int argc, char** argv) /* command line options */ opterr = 0; int c; - while ((c = getopt(argc, argv, "ha:c:i:ns:t:uMEp:X:")) != -1) { + while ((c = getopt(argc, argv, "ha:c:i:ns:t:udMEp:X:")) != -1) { switch(c) { case 'h': usage(); break; case 'a': host_address = optarg; break; @@ -386,6 +388,7 @@ int main(int argc, char** argv) } break; case 't': target_address = optarg; break; + case 'd': verbose = true; break; case 'u': presettle = true; break; case 'M': add_annotations = true; break; case 'E': drop_connection = true; break; @@ -432,7 +435,7 @@ int main(int argc, char** argv) pn_reactor_start(reactor); - time_t last_log = 0; + time_t last_log = time(NULL); while (pn_reactor_process(reactor)) { if (stop) { if (drop_connection) { // hard stop @@ -442,26 +445,22 @@ int main(int argc, char** argv) count, accepted, rejected, released, modified); exit(0); } + if (pn_link) pn_link_close(pn_link); + if (pn_ssn) pn_session_close(pn_ssn); + if (pn_conn) pn_connection_close(pn_conn); - // wait (forever) until all sent messages are confirmed by the - // receiver - - if (count == acked) { - // close the endpoints this will cause pn_reactor_process() to - // eventually break the loop - if (pn_link) pn_link_close(pn_link); - if (pn_ssn) pn_session_close(pn_ssn); - if (pn_conn) pn_connection_close(pn_conn); - } else { - // periodically give status for test output logs - time_t now = time(NULL); - if ((now - last_log) >= 1) { - fprintf(stdout, - "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 - " Released:%"PRIu64" Modified:%"PRIu64"\n", - count, accepted, rejected, released, modified); - last_log = now; - } + } else if (verbose) { + + // periodically give status for test output logs + + time_t now = time(NULL); + if ((now - last_log) >= 10) { + fprintf(stdout, + "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 + " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n", + count, accepted, rejected, released, modified, limit); + fflush(stdout); + last_log = now; } } } @@ -472,5 +471,13 @@ int main(int argc, char** argv) pn_reactor_free(reactor); + if (verbose) { + fprintf(stdout, + "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 + " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n", + count, accepted, rejected, released, modified, limit); + fflush(stdout); + } + return 0; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org