Repository: incubator-impala Updated Branches: refs/heads/master 2bd010781 -> ca55b5926
IMPALA-6070: Parallel compute_table_stats.py Uses a thread pool to issue many compute stats commands in parallel to Impala, rather than doing it serially. Where it was obvious, I combined multiple stats commands into fewer, to reduce the number of "show databses" and serialized "show tables" commands. This speeds up the compute stats step in data loading significantly. My measurements for testdata/bin/compute-table-stats.sh running before and after this change, with the Impala daemons restarted (cold) or not restarted (warm) on an 8-core, 32GB RAM machine were: old, cold: 7m44s new, cold: 1m42s old, warm: 1m23s new, warm: 48s The data load in the full test build behaves in a cold fashion. It's typical for https://jenkins.impala.io/job/ubuntu-16.04-from-scratch/ to run this compute stats step for 9 or 10 minutes. With this change, this will come down to about 2 minutes. Change-Id: Ifb080f2552b9dbe304ecadd6e52429214094237d Reviewed-on: http://gerrit.cloudera.org:8080/8354 Reviewed-by: David Knupp <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/77e010ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/77e010ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/77e010ae Branch: refs/heads/master Commit: 77e010ae4ee07c02a7a945fcb5898db50166c767 Parents: 2bd0107 Author: Philip Zeyliger <[email protected]> Authored: Sat Oct 21 21:27:00 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Oct 24 23:54:15 2017 +0000 ---------------------------------------------------------------------- testdata/bin/compute-table-stats.sh | 6 +-- tests/util/compute_table_stats.py | 81 ++++++++++++++++++++++---------- 2 files changed, 58 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77e010ae/testdata/bin/compute-table-stats.sh ---------------------------------------------------------------------- diff --git a/testdata/bin/compute-table-stats.sh b/testdata/bin/compute-table-stats.sh index f27972c..98434ee 100755 --- a/testdata/bin/compute-table-stats.sh +++ b/testdata/bin/compute-table-stats.sh @@ -41,10 +41,8 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then fi ${COMPUTE_STATS_SCRIPT} --db_names=tpch,tpch_parquet \ --table_names=customer,lineitem,nation,orders,part,partsupp,region,supplier -${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet -${COMPUTE_STATS_SCRIPT} --db_names=tpcds,tpcds_parquet +${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet,tpcds,tpcds_parquet if "$KUDU_IS_SUPPORTED"; then - ${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu - ${COMPUTE_STATS_SCRIPT} --db_names=tpch_kudu + ${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu,tpch_kudu fi http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77e010ae/tests/util/compute_table_stats.py ---------------------------------------------------------------------- diff --git a/tests/util/compute_table_stats.py b/tests/util/compute_table_stats.py index 924d33d..f982802 100755 --- a/tests/util/compute_table_stats.py +++ b/tests/util/compute_table_stats.py @@ -19,36 +19,63 @@ # # Utility for computing table statistics of tables in the Hive Metastore +from contextlib import contextmanager from optparse import OptionParser +import logging +import multiprocessing +import multiprocessing.pool from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient -def compute_stats(impala_client, db_names=None, table_names=None, - continue_on_error=False): +def compute_stats_table(client_factory, db, table, continue_on_error): + """ + Runs 'compute stats' on a given table. If continue_on_error is + True, exceptions computing statistics are swallowed. + """ + with client_factory() as impala_client: + db_table = "%s.%s" % (db, table) + statement = "compute stats %s" % (db_table,) + logging.info('Executing: %s', statement) + try: + result = impala_client.execute(statement) + logging.info(" %s -> %s", db_table, ' '.join(result.data).strip()) + except: + logging.exception(' Failed on table %s', db_table) + if not continue_on_error: + raise + +def compute_stats(client_factory, db_names=None, table_names=None, + continue_on_error=False, parallelism=multiprocessing.cpu_count()): """ Runs COMPUTE STATS over the selected tables. The target tables can be filtered by specifying a list of databases and/or table names. If no filters are specified this will run COMPUTE STATS on all tables in all databases. + + parallelism controls the size of the thread pool to which compute_stats + is sent. """ - print "Enumerating databases and tables for compute stats." + logging.info("Enumerating databases and tables for compute stats.") - all_dbs = set(name.split('\t')[0].lower() for name in impala_client.execute("show databases").data) - selected_dbs = all_dbs if db_names is None else set(db_names) - for db in all_dbs.intersection(selected_dbs): - all_tables =\ - set([t.lower() for t in impala_client.execute("show tables in %s" % db).data]) - selected_tables = all_tables if table_names is None else set(table_names) - for table in all_tables.intersection(selected_tables): - statement = "compute stats %s.%s" % (db, table) - print 'Executing: %s' % statement - try: - result = impala_client.execute(statement) - print " -> %s\n" % '\n'.join(result.data) - except Exception, e: - print " -> Error: %s\n" % str(e) - if not continue_on_error: raise e + pool = multiprocessing.pool.ThreadPool(processes=parallelism) + futures = [] + with client_factory() as impala_client: + all_dbs = set(name.split('\t')[0].lower() for name + in impala_client.execute("show databases").data) + selected_dbs = all_dbs if db_names is None else set(db_names) + for db in all_dbs.intersection(selected_dbs): + all_tables =\ + set([t.lower() for t in impala_client.execute("show tables in %s" % db).data]) + selected_tables = all_tables if table_names is None else set(table_names) + for table in all_tables.intersection(selected_tables): + # Submit command to threadpool + futures.append(pool.apply_async(compute_stats_table, + (client_factory, db, table, continue_on_error,))) + # Wait for all stats commands to finish + for f in futures: + f.get() if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s: %(message)s') parser = OptionParser() parser.add_option("--continue_on_error", dest="continue_on_error", action="store_true", default=True, help="If True, continue "\ @@ -62,6 +89,8 @@ if __name__ == "__main__": help="Compute stats on a kerberized cluster.") parser.add_option("--use_ssl", action="store_true", default=False, help="Compute stats on a cluster with SSL enabled.") + parser.add_option("--parallelism", type=int, default=multiprocessing.cpu_count(), + help="Number of parallel compute stats commands.") parser.add_option("--db_names", dest="db_names", default=None, help="Comma-separated list of database names for which to compute "\ "stats. Can be used in conjunction with the --table_names flag. "\ @@ -80,11 +109,13 @@ if __name__ == "__main__": if options.db_names is not None: db_names = [name.lower().strip() for name in options.db_names.split(',')] - impala_client = ImpalaBeeswaxClient(options.impalad, use_kerberos=options.use_kerberos, - use_ssl=options.use_ssl) - impala_client.connect() - try: - compute_stats(impala_client, db_names=db_names, - table_names=table_names, continue_on_error=options.continue_on_error) - finally: + @contextmanager + def client_factory(): + impala_client = ImpalaBeeswaxClient(options.impalad, + use_kerberos=options.use_kerberos, use_ssl=options.use_ssl) + impala_client.connect() + yield impala_client impala_client.close_connection() + + compute_stats(client_factory, db_names=db_names, table_names=table_names, + continue_on_error=options.continue_on_error, parallelism=options.parallelism)
