This is an automated email from the ASF dual-hosted git repository. huijun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new fa4ae37 add max thread limit in integration test (#3025) fa4ae37 is described below commit fa4ae377a5c71d798ae81258d2e6b16de5bbb721 Author: Yao Li <clshl...@gmail.com> AuthorDate: Fri Sep 14 01:46:04 2018 -0700 add max thread limit in integration test (#3025) merge this pr for integration threads fix. the 3 integration failure fix will be in the next pr --- integration_test/src/python/test_runner/main.py | 21 +++++++++++---------- .../src/python/topology_test_runner/main.py | 21 +++++++++++---------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/integration_test/src/python/test_runner/main.py b/integration_test/src/python/test_runner/main.py index ae921b3..76cb281 100644 --- a/integration_test/src/python/test_runner/main.py +++ b/integration_test/src/python/test_runner/main.py @@ -356,7 +356,6 @@ def run_tests(conf, test_args): failures += [test_tuple] lock.release() - current = 1 test_threads = [] for topology_conf in test_topologies: topology_name = ("%s_%s_%s") % (timestamp, topology_conf["topologyName"], str(uuid.uuid4())) @@ -377,18 +376,19 @@ def run_tests(conf, test_args): + topology_name) topology_args = "%s %s" % (topology_args, topology_conf["topologyArgs"]) - logging.info("==== Starting test %s of %s: %s ====", - current, len(test_topologies), topology_name) - test_threads.append(Thread(target=_run_single_test, args=(topology_name, topology_conf, test_args, http_server_host_port, classpath, update_args, topology_args))) - current += 1 - - for thread in test_threads: - thread.start() - for thread in test_threads: - thread.join() + # Run test in batches + start = 0 + while start < len(test_threads): + end = min(start + int(test_args.max_thread_number), len(test_threads)) + for i in range(start, end): + logging.info("==== Starting test %s of %s ====", i + 1, len(test_threads)) + test_threads[i].start() + for i in range(start, end): + test_threads[i].join() + start = end return @@ -443,6 +443,7 @@ def main(): parser.add_argument('-pi', '--release-package-uri', dest='release_package_uri', default=None) parser.add_argument('-cd', '--cli-config-path', dest='cli_config_path', default=conf['cliConfigPath']) + parser.add_argument('-ms', '--max-thread-number', dest='max_thread_number', default=1) #parser.add_argument('-dt', '--disable-topologies', dest='disabledTopologies', default='', # help='comma separated test case(classpath) name that will not be run') diff --git a/integration_test/src/python/topology_test_runner/main.py b/integration_test/src/python/topology_test_runner/main.py index f0c2f9d..59499bf 100644 --- a/integration_test/src/python/topology_test_runner/main.py +++ b/integration_test/src/python/topology_test_runner/main.py @@ -561,7 +561,6 @@ def run_topology_tests(conf, test_args): failures += [test_tuple] lock.release() - current = 1 test_threads = [] for topology_conf in test_topologies: topology_name = ("%s_%s_%s") % (timestamp, topology_conf["topologyName"], str(uuid.uuid4())) @@ -588,19 +587,20 @@ def run_topology_tests(conf, test_args): expected_state_result_file_path = \ test_args.topologies_path + "/" + topology_conf["expectedStateResultRelativePath"] - logging.info("==== Starting test %s of %s: %s ====", - current, len(test_topologies), topology_name) - test_threads.append(Thread(target=_run_single_test, args=(topology_name, topology_conf, test_args, http_server_host_port, classpath, update_args, deactivate_args, restart_args, topology_args, expected_topo_result_file_path, expected_state_result_file_path))) - current += 1 - - for thread in test_threads: - thread.start() - for thread in test_threads: - thread.join() + # Run test in batches + start = 0 + while start < len(test_threads): + end = min(start + int(test_args.max_thread_number), len(test_threads)) + for i in range(start, end): + logging.info("==== Starting test %s of %s ====", i + 1, len(test_threads)) + test_threads[i].start() + for i in range(start, end): + test_threads[i].join() + start = end return @@ -654,6 +654,7 @@ def main(): parser.add_argument('-pi', '--release-package-uri', dest='release_package_uri', default=None) parser.add_argument('-cd', '--cli-config-path', dest='cli_config_path', default=conf['cliConfigPath']) + parser.add_argument('-ms', '--max-thread-number', dest='max_thread_number', default=1) args, unknown_args = parser.parse_known_args() if unknown_args: