This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new fa4ae37  add max thread limit in integration test (#3025)
fa4ae37 is described below

commit fa4ae377a5c71d798ae81258d2e6b16de5bbb721
Author: Yao Li <clshl...@gmail.com>
AuthorDate: Fri Sep 14 01:46:04 2018 -0700

    add max thread limit in integration test (#3025)
    
    merge this pr for integration threads fix. the 3 integration failure fix 
will be in the next pr
---
 integration_test/src/python/test_runner/main.py     | 21 +++++++++++----------
 .../src/python/topology_test_runner/main.py         | 21 +++++++++++----------
 2 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/integration_test/src/python/test_runner/main.py 
b/integration_test/src/python/test_runner/main.py
index ae921b3..76cb281 100644
--- a/integration_test/src/python/test_runner/main.py
+++ b/integration_test/src/python/test_runner/main.py
@@ -356,7 +356,6 @@ def run_tests(conf, test_args):
       failures += [test_tuple]
       lock.release()
 
-  current = 1
   test_threads = []
   for topology_conf in test_topologies:
     topology_name = ("%s_%s_%s") % (timestamp, topology_conf["topologyName"], 
str(uuid.uuid4()))
@@ -377,18 +376,19 @@ def run_tests(conf, test_args):
                          + topology_name)
       topology_args = "%s %s" % (topology_args, topology_conf["topologyArgs"])
 
-    logging.info("==== Starting test %s of %s: %s ====",
-      current, len(test_topologies), topology_name)
-
     test_threads.append(Thread(target=_run_single_test, args=(topology_name, 
topology_conf,
       test_args, http_server_host_port, classpath, update_args, 
topology_args)))
 
-    current += 1
-
-  for thread in test_threads:
-    thread.start()
-  for thread in test_threads:
-    thread.join()
+  # Run test in batches
+  start = 0
+  while start < len(test_threads):
+    end = min(start + int(test_args.max_thread_number), len(test_threads))
+    for i in range(start, end):
+      logging.info("==== Starting test %s of %s ====", i + 1, 
len(test_threads))
+      test_threads[i].start()
+    for i in range(start, end):
+      test_threads[i].join()
+    start = end
 
   return
 
@@ -443,6 +443,7 @@ def main():
   parser.add_argument('-pi', '--release-package-uri', 
dest='release_package_uri', default=None)
   parser.add_argument('-cd', '--cli-config-path', dest='cli_config_path',
                       default=conf['cliConfigPath'])
+  parser.add_argument('-ms', '--max-thread-number', dest='max_thread_number', 
default=1)
 
   #parser.add_argument('-dt', '--disable-topologies', 
dest='disabledTopologies', default='',
   #                    help='comma separated test case(classpath) name that 
will not be run')
diff --git a/integration_test/src/python/topology_test_runner/main.py 
b/integration_test/src/python/topology_test_runner/main.py
index f0c2f9d..59499bf 100644
--- a/integration_test/src/python/topology_test_runner/main.py
+++ b/integration_test/src/python/topology_test_runner/main.py
@@ -561,7 +561,6 @@ def run_topology_tests(conf, test_args):
       failures += [test_tuple]
       lock.release()
 
-  current = 1
   test_threads = []
   for topology_conf in test_topologies:
     topology_name = ("%s_%s_%s") % (timestamp, topology_conf["topologyName"], 
str(uuid.uuid4()))
@@ -588,19 +587,20 @@ def run_topology_tests(conf, test_args):
       expected_state_result_file_path = \
         test_args.topologies_path + "/" + 
topology_conf["expectedStateResultRelativePath"]
 
-    logging.info("==== Starting test %s of %s: %s ====",
-      current, len(test_topologies), topology_name)
-
     test_threads.append(Thread(target=_run_single_test, args=(topology_name, 
topology_conf, test_args,
       http_server_host_port, classpath, update_args, deactivate_args, 
restart_args,
       topology_args, expected_topo_result_file_path, 
expected_state_result_file_path)))
 
-    current += 1
-
-  for thread in test_threads:
-    thread.start()
-  for thread in test_threads:
-    thread.join()
+  # Run test in batches
+  start = 0
+  while start < len(test_threads):
+    end = min(start + int(test_args.max_thread_number), len(test_threads))
+    for i in range(start, end):
+      logging.info("==== Starting test %s of %s ====", i + 1, 
len(test_threads))
+      test_threads[i].start()
+    for i in range(start, end):
+      test_threads[i].join()
+    start = end
 
   return
 
@@ -654,6 +654,7 @@ def main():
   parser.add_argument('-pi', '--release-package-uri', 
dest='release_package_uri', default=None)
   parser.add_argument('-cd', '--cli-config-path', dest='cli_config_path',
     default=conf['cliConfigPath'])
+  parser.add_argument('-ms', '--max-thread-number', dest='max_thread_number', 
default=1)
 
   args, unknown_args = parser.parse_known_args()
   if unknown_args:

Reply via email to